In [1]:
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home'

In [2]:
%%bash
echo $JAVA_HOME

/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home


### Session

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName('SparkTutorial').getOrCreate()

In [5]:
type(spark)

pyspark.sql.session.SparkSession

In [6]:
spark

### Reading Json

In [7]:
df = spark.read.json('student.json')

In [8]:
df

DataFrame[grade: bigint, name: string]

In [9]:
df.show()

+-----+-----+
|grade| name|
+-----+-----+
|    4| John|
|    9|Marry|
|    7|Peter|
+-----+-----+



In [10]:
df.printSchema()

root
 |-- grade: long (nullable = true)
 |-- name: string (nullable = true)



In [11]:
df.columns

['grade', 'name']

In [12]:
df.count()

3

In [13]:
df.describe()

DataFrame[summary: string, grade: string, name: string]

In [14]:
df.describe().show()

+-------+-----------------+-----+
|summary|            grade| name|
+-------+-----------------+-----+
|  count|                3|    3|
|   mean|6.666666666666667| null|
| stddev|2.516611478423583| null|
|    min|                4| John|
|    max|                9|Peter|
+-------+-----------------+-----+



In [15]:
df.head(2)

[Row(grade=4, name='John'), Row(grade=9, name='Marry')]

### Define Custom Schema

In [16]:
from pyspark.sql.types import StructField, StringType, StructType, IntegerType

In [17]:
schema = StructType([StructField('name', StringType()), StructField('grade', IntegerType())])

In [18]:
df = spark.read.json('student.json', schema=schema)

In [19]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- grade: integer (nullable = true)



### Dataframe as SQL Table

In [20]:
df.createOrReplaceTempView('student')  # will create SQL table with given name

In [21]:
spark.sql("SELECT * FROM student")

DataFrame[name: string, grade: int]

In [22]:
spark.sql("SELECT * FROM student WHERE grade > 5").show()

+-----+-----+
| name|grade|
+-----+-----+
|Marry|    9|
|Peter|    7|
+-----+-----+



### Dataframe Operations

* select column
* create new column
* rename column

In [23]:
df.select('grade')

DataFrame[grade: int]

In [24]:
df.select('grade').show()

+-----+
|grade|
+-----+
|    4|
|    9|
|    7|
+-----+



In [25]:
df.select(['name', 'grade']).show()

+-----+-----+
| name|grade|
+-----+-----+
| John|    4|
|Marry|    9|
|Peter|    7|
+-----+-----+



In [26]:
df.withColumn('new_grade', df['grade']**2)

DataFrame[name: string, grade: int, new_grade: double]

In [27]:
df.withColumn('new_grade', df['grade']**2).show()

+-----+-----+---------+
| name|grade|new_grade|
+-----+-----+---------+
| John|    4|     16.0|
|Marry|    9|     81.0|
|Peter|    7|     49.0|
+-----+-----+---------+



In [28]:
df.show()

+-----+-----+
| name|grade|
+-----+-----+
| John|    4|
|Marry|    9|
|Peter|    7|
+-----+-----+



In [29]:
df.withColumnRenamed('name', 'my_name')

DataFrame[my_name: string, grade: int]

In [30]:
df.withColumnRenamed('name', 'my_name').show()

+-------+-----+
|my_name|grade|
+-------+-----+
|   John|    4|
|  Marry|    9|
|  Peter|    7|
+-------+-----+



In [31]:
df.show()

+-----+-----+
| name|grade|
+-----+-----+
| John|    4|
|Marry|    9|
|Peter|    7|
+-----+-----+



DataFrame is immutable object !!!!

### Dataframe Operations

* Group By
* Order By
* Special Function

In [32]:
df = spark.read.csv('titanic.csv', header=True, sep='\t', inferSchema=True)

In [33]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [34]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [35]:
df.groupBy('Sex').count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|   56|
|  male|  100|
+------+-----+



In [36]:
df.groupBy('Sex').agg({'Age': 'mean'}).show()

+------+------------------+
|   Sex|          avg(Age)|
+------+------------------+
|female| 24.46808510638298|
|  male|30.326962025316455|
+------+------------------+



In [37]:
df.groupBy('Sex').mean().select('Sex', 'Avg(Age)').show()

+------+------------------+
|   Sex|          Avg(Age)|
+------+------------------+
|female| 24.46808510638298|
|  male|30.326962025316455|
+------+------------------+



In [38]:
df.orderBy('Fare').show()

+-----------+--------+------+--------------------+------+----+-----+-----+------------------+------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|            Ticket|  Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------------------+------+-----+--------+
|        144|       0|     3| Burke, Mr. Jeremiah|  male|19.0|    0|    0|            365222|  6.75| null|       Q|
|        130|       0|     3|  Ekstrom, Mr. Johan|  male|45.0|    0|    0|            347061| 6.975| null|       S|
|        132|       0|     3|Coelho, Mr. Domin...|  male|20.0|    0|    0|SOTON/O.Q. 3101307|  7.05| null|       S|
|        128|       1|     3|Madsen, Mr. Fridt...|  male|24.0|    0|    0|           C 17369|7.1417| null|       S|
|         20|       1|     3|Masselmani, Mrs. ...|female|null|    0|    0|              2649| 7.225| null|       C|
|         27|       0|     3|Emir, Mr. Farred ...|  male|null|    0|    

In [39]:
df.orderBy(df['Fare'].desc()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+------------+--------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|      Ticket|    Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+------------+--------+-----------+--------+
|         28|       0|     1|Fortune, Mr. Char...|  male|19.0|    3|    2|       19950|   263.0|C23 C25 C27|       S|
|         89|       1|     1|Fortune, Miss. Ma...|female|23.0|    3|    2|       19950|   263.0|C23 C25 C27|       S|
|        119|       0|     1|Baxter, Mr. Quigg...|  male|24.0|    0|    1|    PC 17558|247.5208|    B58 B60|       C|
|         32|       1|     1|Spencer, Mrs. Wil...|female|null|    1|    0|    PC 17569|146.5208|        B78|       C|
|         63|       0|     1|Harris, Mr. Henry...|  male|45.0|    1|    0|       36973|  83.475|        C83|       S|
|         35|       0|     1|Meyer, Mr. Edgar ...|  male

In [40]:
from pyspark.sql.functions import mean, countDistinct

In [41]:
df.select(mean('Age').alias('mean AGE')).show()

+------------------+
|          mean AGE|
+------------------+
|28.141507936507935|
+------------------+



In [42]:
df.select(countDistinct('Sex').alias('distinct SEX')).show()

+------------+
|distinct SEX|
+------------+
|           2|
+------------+



### Filter Data

In [43]:
df.filter(df['sex'] == 'male').show()

+-----------+--------+------+--------------------+----+----+-----+-----+----------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|    Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+----------+-------+-----------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|male|22.0|    1|    0| A/5 21171|   7.25|       null|       S|
|          5|       0|     3|Allen, Mr. Willia...|male|35.0|    0|    0|    373450|   8.05|       null|       S|
|          6|       0|     3|    Moran, Mr. James|male|null|    0|    0|    330877| 8.4583|       null|       Q|
|          7|       0|     1|McCarthy, Mr. Tim...|male|54.0|    0|    0|     17463|51.8625|        E46|       S|
|          8|       0|     3|Palsson, Master. ...|male| 2.0|    3|    1|    349909| 21.075|       null|       S|
|         13|       0|     3|Saundercock, Mr. ...|male|20.0|    0|    0| A/5. 2151|   8.05|     

In [44]:
df.filter((df['sex'] == 'male') & (df['Age'] > 40)) .show()

+-----------+--------+------+--------------------+----+----+-----+-----+-----------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|     Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+-----------+-------+-----+--------+
|          7|       0|     1|McCarthy, Mr. Tim...|male|54.0|    0|    0|      17463|51.8625|  E46|       S|
|         34|       0|     2|Wheadon, Mr. Edwa...|male|66.0|    0|    0| C.A. 24579|   10.5| null|       S|
|         36|       0|     1|Holverson, Mr. Al...|male|42.0|    1|    0|     113789|   52.0| null|       S|
|         55|       0|     1|Ostby, Mr. Engelh...|male|65.0|    0|    1|     113509|61.9792|  B30|       C|
|         63|       0|     1|Harris, Mr. Henry...|male|45.0|    1|    0|      36973| 83.475|  C83|       S|
|         93|       0|     1|Chaffee, Mr. Herb...|male|46.0|    1|    0|W.E.P. 5734| 61.175|  E31|       S|
|         95|       0|     3

### Missing Data

In [45]:
df = spark.read.csv('missing-data.csv', inferSchema=True, header=False)

In [46]:
df.show()

+----+----+----+
| _c0| _c1| _c2|
+----+----+----+
| 5.0|   3| 2.5|
| 2.6|null| 4.0|
|null|null| 4.3|
| 7.0|   3|null|
+----+----+----+



In [47]:
df.dropna().show()

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|5.0|  3|2.5|
+---+---+---+



In [48]:
df.dropna(thresh=2).show()

+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
|5.0|   3| 2.5|
|2.6|null| 4.0|
|7.0|   3|null|
+---+----+----+



In [49]:
df.dropna(how='all').show()

+----+----+----+
| _c0| _c1| _c2|
+----+----+----+
| 5.0|   3| 2.5|
| 2.6|null| 4.0|
|null|null| 4.3|
| 7.0|   3|null|
+----+----+----+



In [50]:
df.fillna(0).show()

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|5.0|  3|2.5|
|2.6|  0|4.0|
|0.0|  0|4.3|
|7.0|  3|0.0|
+---+---+---+



### Datetime data

In [51]:
df = spark.read.csv('stock-data.csv', inferSchema=True, header=True)

In [52]:
df.show()

+-------------------+----------+----------+---------+-----------+
|               Date|Open Price|High Price|Low Price|Close Price|
+-------------------+----------+----------+---------+-----------+
|2018-02-28 00:00:00|     142.0|     151.7|    142.0|     148.55|
|2018-02-27 00:00:00|    149.15|     154.0|    149.1|      150.0|
|2018-02-26 00:00:00|     151.6|     152.0|    151.2|     151.35|
|2018-02-25 00:00:00|     151.3|     154.0|    151.1|      153.2|
|2018-02-24 00:00:00|     152.0|     153.6|    152.0|     152.65|
|2018-02-23 00:00:00|     154.0|     154.0|    151.3|      153.2|
|2018-02-22 00:00:00|     157.2|     157.2|   156.85|     156.95|
|2018-02-21 00:00:00|     154.7|     160.0|   152.55|     155.45|
|2018-02-20 00:00:00|     161.0|     161.5|    158.0|      160.2|
|2018-02-19 00:00:00|     157.5|    164.05|    157.5|      161.5|
|2018-02-18 00:00:00|     161.9|    164.95|    160.0|      160.9|
|2018-02-17 00:00:00|     161.4|     171.1|    160.9|     163.75|
+---------

In [53]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open Price: double (nullable = true)
 |-- High Price: double (nullable = true)
 |-- Low Price: double (nullable = true)
 |-- Close Price: double (nullable = true)



In [54]:
from pyspark.sql.functions import dayofmonth, dayofyear, weekofyear, hour, month, year

In [55]:
df.select(year('date')).show()

+----------+
|year(date)|
+----------+
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
|      2018|
+----------+



In [56]:
df.select(dayofmonth('date')).show()

+----------------+
|dayofmonth(date)|
+----------------+
|              28|
|              27|
|              26|
|              25|
|              24|
|              23|
|              22|
|              21|
|              20|
|              19|
|              18|
|              17|
+----------------+



In [57]:
df.withColumn('day of year', dayofyear('date')).show()

+-------------------+----------+----------+---------+-----------+-----------+
|               Date|Open Price|High Price|Low Price|Close Price|day of year|
+-------------------+----------+----------+---------+-----------+-----------+
|2018-02-28 00:00:00|     142.0|     151.7|    142.0|     148.55|         59|
|2018-02-27 00:00:00|    149.15|     154.0|    149.1|      150.0|         58|
|2018-02-26 00:00:00|     151.6|     152.0|    151.2|     151.35|         57|
|2018-02-25 00:00:00|     151.3|     154.0|    151.1|      153.2|         56|
|2018-02-24 00:00:00|     152.0|     153.6|    152.0|     152.65|         55|
|2018-02-23 00:00:00|     154.0|     154.0|    151.3|      153.2|         54|
|2018-02-22 00:00:00|     157.2|     157.2|   156.85|     156.95|         53|
|2018-02-21 00:00:00|     154.7|     160.0|   152.55|     155.45|         52|
|2018-02-20 00:00:00|     161.0|     161.5|    158.0|      160.2|         51|
|2018-02-19 00:00:00|     157.5|    164.05|    157.5|      161.5

In [58]:
df.filter(dayofyear('date')  < 50).show()

+-------------------+----------+----------+---------+-----------+
|               Date|Open Price|High Price|Low Price|Close Price|
+-------------------+----------+----------+---------+-----------+
|2018-02-18 00:00:00|     161.9|    164.95|    160.0|      160.9|
|2018-02-17 00:00:00|     161.4|     171.1|    160.9|     163.75|
+-------------------+----------+----------+---------+-----------+

