In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ychen_df').getOrCreate()

In [2]:
df = spark.read.json('people.json')

In [3]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [4]:
from pyspark.ml.feature import VectorAssembler

In [5]:
assember = VectorAssembler(inputCols=['age', 'name'], outputCol='features')

In [8]:
#output = assember.transform(df)

In [28]:
pandas_df = df.toPandas()

In [1]:
#dir(pandas_df)

In [7]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [15]:
df.columns

['age', 'name']

In [17]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [23]:
row = df.head(2)[0]

In [32]:
#dir(row)

In [33]:
df2 = df.select(['name', 'age'])

In [34]:
df2.show()

+-------+----+
|   name| age|
+-------+----+
|Michael|null|
|   Andy|  30|
| Justin|  19|
+-------+----+



In [42]:
df2 = df.withColumn('double_age', df['age']*2)

In [41]:
df2.show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|      null|
|  19| Justin|      null|
+----+-------+----------+



In [43]:
df2

DataFrame[age: bigint, name: string, double_age: bigint]

In [44]:
df.createOrReplaceTempView('people')

In [45]:
res = spark.sql('select * from people')

In [46]:
type(res)

pyspark.sql.dataframe.DataFrame

In [47]:
res.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [9]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

In [11]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [12]:
assembler = VectorAssembler(inputCols=['Open', 'High', 'Low', 'Close', 'Volume'], 
                            outputCol='features')

In [13]:
output = assembler.transform(df)

In [14]:
output.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close', 'features']

In [15]:
output.select('features').show()

+--------------------+
|            features|
+--------------------+
|[213.429998,214.4...|
|[214.599998,215.5...|
|[214.379993,215.2...|
|[211.75,212.00000...|
|[210.299994,212.0...|
|[212.799997000000...|
|[209.189994999999...|
|[207.870005,210.9...|
|[210.110002999999...|
|[210.929995000000...|
|[208.330002,215.1...|
|[214.910006,215.5...|
|[212.079994,213.3...|
|[206.780006000000...|
|[202.510002000000...|
|[205.950001000000...|
|[206.849995,210.5...|
|[204.930004,205.5...|
|[201.079996,202.1...|
|[192.369996999999...|
+--------------------+
only showing top 20 rows



In [55]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [60]:
res1 = df.filter(df['Close'] > 200)

In [61]:
res2 = df.filter('Close > 200')

In [62]:
res1.describe()

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

In [63]:
res2.describe()

DataFrame[summary: string, Open: string, High: string, Low: string, Close: string, Volume: string, Adj Close: string]

In [71]:
res = df.filter((df['Open']<200) & (df['Close']>200)).select(['Open', 'Close'])
res.show()

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|198.109995|200.37999299999998|
|198.229998|            200.66|
|197.380005|        202.000004|
+----------+------------------+



In [72]:
res = df.filter((df['Open']<200) & (df['Close']>200)).select(['Open', 'Close']).collect()
type(res)

list

In [73]:
row = res[0]

In [79]:
row

Row(Open=198.109995, Close=200.37999299999998)

In [80]:
df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)

In [82]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [83]:
df.count()

12

In [86]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [96]:
type(df.groupby('Company').sum())

pyspark.sql.dataframe.DataFrame

In [97]:
df.agg({'Sales': 'sum', 'Sales': 'mean'}).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [98]:
from pyspark.sql.functions import countDistinct, avg, stddev

In [101]:
df.select(avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [105]:
df.select(avg('Sales').alias('avg_sales')).head(1)[0].asDict()

{'avg_sales': 360.5833333333333}

In [106]:
std = df.select(stddev('Sales').alias('std'))

In [107]:
std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [108]:
from pyspark.sql.functions import format_number

In [111]:
std2 = std.select(format_number('std', 2).alias('std_formatted'))

In [112]:
std2.show()

+-------------+
|std_formatted|
+-------------+
|       250.09|
+-------------+



In [116]:
df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [118]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [9]:
df = spark.read.csv('ContainsNull.csv', inferSchema=True, header=True)

In [43]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [46]:
df.na.drop(thresh=3).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [13]:
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [19]:
df.na.fill('No Name', subset=['Name']).na.fill(0, subset=['Sales']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|  0.0|
|emp2|No Name|  0.0|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [39]:
df.collect()[0][:]

('emp1', 'John', None)

In [20]:
from pyspark.sql.functions import mean

In [32]:
mean_val = df.select(mean(df['Sales'])).collect()[0]

In [37]:
type(mean_val)

pyspark.sql.types.Row

In [40]:
mean_val = mean_val[0]

In [42]:
df.na.fill(mean_val, subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [4]:
df = spark.read.csv('appl_stock.csv', header=True, inferSchema=True)

In [5]:
df.select(['Date', 'Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [7]:
from pyspark.sql.functions import year, mean, format_number

In [20]:
df_new = df.withColumn('Year', year(df['Date']))
df_new.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.0600050000000

In [24]:
df2 = df_new.groupBy('Year').mean().select(['year', 'avg(Close)'])

In [28]:
df2 = df2.withColumnRenamed('avg(Close)', 'Close2')

In [29]:
df2.show()

+----+------------------+
|year|            Close2|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [36]:
result = df2.select(['year', format_number('Close2', 2).alias('Close2')]).orderBy('year')

In [37]:
result.show()

+----+------+
|year|Close2|
+----+------+
|2010|259.84|
|2011|364.00|
|2012|576.05|
|2013|472.63|
|2014|295.40|
|2015|120.04|
|2016|104.60|
+----+------+

