In [3]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

import numpy
import pandas as pd
from pandas import DataFrame as df
import difflib
import datetime

# Making Dataframe from Dict of Dictionaries

In [4]:
data1 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Name': {0: 'Owen', 1: 'Florence', 2: 'Laina', 3: 'Lily', 4: 'William'},
         'Gender': {0:'M', 1: 'F', 2: 'F', 3: 'F', 4: 'M'},
         'Survived': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0}}

data2 = {'PassengerId': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},
         'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35},
         'Fare': {0: 7.3, 1: 71.3, 2: 7.9, 3: 53.1, 4: 8.0},
         'Pclass': {0: 3, 1: 1, 2: 3, 3: 1, 4: 3}}

df1_pd = pd.DataFrame(data1, columns=data1.keys())
df2_pd = pd.DataFrame(data2, columns=data2.keys())

### Use HiveContext to createDataFrame 

In [33]:
hc = HiveContext(sc)

In [34]:
df1 = hc.createDataFrame(df1_pd)

In [35]:
df2 = hc.createDataFrame(df2_pd)

### printing Schema of dataframe

In [36]:
df1.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Survived: long (nullable = true)



### Creating dataframe with defining Schema

In [97]:
df1_schema = StructType([StructField('PassengerId',IntegerType(),True),
                         StructField('Name',StringType(),True),
                         StructField('Gender',StringType(),True),
                         StructField('Survived',IntegerType(),True)])
# for date type: StructField('MeetingDate',DateType(),True) ])
#True denotes that data field is nullable


In [98]:
df1_spark = hc.createDataFrame(df1_pd,df1_schema)

In [99]:
df1_spark.show()

+-----------+--------+------+--------+
|PassengerId|    Name|Gender|Survived|
+-----------+--------+------+--------+
|          1|    Owen|     M|       0|
|          2|Florence|     F|       1|
|          3|   Laina|     F|       1|
|          4|    Lily|     F|       1|
|          5| William|     M|       0|
+-----------+--------+------+--------+



In [100]:
df2_schema = StructType([StructField('PassengerId',IntegerType(),True),
                         StructField('Age',IntegerType(),True),
                         StructField('Fare',DoubleType(),True),
                         StructField('PClass',IntegerType(),True)])

In [101]:
df2_spark = hc.createDataFrame(df2_pd,df2_schema)

In [102]:
df2_spark.show()

+-----------+---+----+------+
|PassengerId|Age|Fare|PClass|
+-----------+---+----+------+
|          1| 22| 7.3|     3|
|          2| 38|71.3|     1|
|          3| 26| 7.9|     3|
|          4| 35|53.1|     1|
|          5| 35| 8.0|     3|
+-----------+---+----+------+



### Different Datatypes available

BinaryType, BooleanType, ByteType, DoubleType, DateType, FloatType, IntegerType 

### Changing DataTypes

In [103]:
from pyspark.sql.types import IntegerType, DateType

In [104]:
#withColumn(ColName, Col)

In [105]:
df1_spark_1 = df1_spark.withColumn("PassengerId", df1_spark["PassengerId"].cast(StringType()))

In [106]:
df1_spark_1.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Survived: integer (nullable = true)



In [107]:
df1_spark.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Survived: integer (nullable = true)



### Rename Column Name

In [108]:
df1_spark = df1_spark.withColumnRenamed('Gender','Sex')

In [109]:
df1_spark.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Survived: integer (nullable = true)



### Modifying Column Values and Creating new Columns

#### M to Male and F to Female in Sex Column

In [110]:
df1_spark_2 = df1_spark.withColumn('Sex',F.when(trim(df1_spark['Sex']) == 'M','Male')\
                                 .otherwise(F.when(trim(df1_spark['Sex']) == 'F','Female').otherwise('NA')))

In [111]:
df1_spark_2.show()

+-----------+--------+------+--------+
|PassengerId|    Name|   Sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  Male|       0|
|          2|Florence|Female|       1|
|          3|   Laina|Female|       1|
|          4|    Lily|Female|       1|
|          5| William|  Male|       0|
+-----------+--------+------+--------+



#### Create New column 'type_of_data' with constant value: 'train'

In [115]:
df1_spark_2 = df1_spark_2.withColumn('type_of_data',lit('train'))

In [None]:
# lit() creates a column of literal values

In [116]:
df1_spark_2.show()

+-----------+--------+------+--------+------------+
|PassengerId|    Name|   Sex|Survived|type_of_data|
+-----------+--------+------+--------+------------+
|          1|    Owen|  Male|       0|       train|
|          2|Florence|Female|       1|       train|
|          3|   Laina|Female|       1|       train|
|          4|    Lily|Female|       1|       train|
|          5| William|  Male|       0|       train|
+-----------+--------+------+--------+------------+



### Ordering DataFrame by specific Column - PassengerID

In [120]:
df1_spark_2.sort(df1_spark_2.PassengerId.desc()).show()

+-----------+--------+------+--------+------------+
|PassengerId|    Name|   Sex|Survived|type_of_data|
+-----------+--------+------+--------+------------+
|          5| William|  Male|       0|       train|
|          4|    Lily|Female|       1|       train|
|          3|   Laina|Female|       1|       train|
|          2|Florence|Female|       1|       train|
|          1|    Owen|  Male|       0|       train|
+-----------+--------+------+--------+------------+



In [123]:
df1_spark_2.sort('PassengerId',ascending=False).show()

+-----------+--------+------+--------+------------+
|PassengerId|    Name|   Sex|Survived|type_of_data|
+-----------+--------+------+--------+------------+
|          5| William|  Male|       0|       train|
|          4|    Lily|Female|       1|       train|
|          3|   Laina|Female|       1|       train|
|          2|Florence|Female|       1|       train|
|          1|    Owen|  Male|       0|       train|
+-----------+--------+------+--------+------------+



### Filter by Specific Column

#### Filter out female population
Filter takes column expression or SQL expression

Using Columns Expression

In [128]:
female_data = df1_spark_2.filter(trim(col('Sex'))=='Female')

In [129]:
female_data.show()

+-----------+--------+------+--------+------------+
|PassengerId|    Name|   Sex|Survived|type_of_data|
+-----------+--------+------+--------+------------+
|          2|Florence|Female|       1|       train|
|          3|   Laina|Female|       1|       train|
|          4|    Lily|Female|       1|       train|
+-----------+--------+------+--------+------------+



Using SQL Expression

#### col('columnname') = dataframe['columnname'] = dataframe.columnname

In [131]:
female_data = df1_spark_2a.filter("Sex='Female'")

In [132]:
female_data.show()

+-----------+--------+------+--------+------------+
|PassengerId|    Name|   Sex|Survived|type_of_data|
+-----------+--------+------+--------+------------+
|          2|Florence|Female|       1|       train|
|          3|   Laina|Female|       1|       train|
|          4|    Lily|Female|       1|       train|
+-----------+--------+------+--------+------------+



### Summarize / Aggregate and GroupBy

#### Group by Passenger class and find avg fare and age

In [133]:
gdf2_spark = df2_spark.groupby('PClass')

In [135]:
avg_col = ['Age','Fare']
gdf2_spark.avg(*avg_col).show()

+------+------------------+-----------------+
|PClass|          avg(Age)|        avg(Fare)|
+------+------------------+-----------------+
|     1|              36.5|             62.2|
|     3|27.666666666666668|7.733333333333333|
+------+------------------+-----------------+



#### Group By Passenger Class and find Total Fare and Average Age and count of passengers

In [142]:
gdf2_spark.agg({'*':'count','Age':'Mean','Fare':'Sum'}).show() #avg/Avg/Mean sum/Sum

+------+--------+------------------+---------+
|PClass|count(1)|          avg(Age)|sum(Fare)|
+------+--------+------------------+---------+
|     1|       2|              36.5|    124.4|
|     3|       3|27.666666666666668|     23.2|
+------+--------+------------------+---------+



#### Rename the columns count(1), avg(Age) etc

In [145]:
gdf2_spark.agg({'*':'count','Age':'Mean','Fare':'Sum'}).toDF('PClass','Count','Avg Age','Total Fare').show()

+------+-----+------------------+----------+
|PClass|Count|           Avg Age|Total Fare|
+------+-----+------------------+----------+
|     1|    2|              36.5|     124.4|
|     3|    3|27.666666666666668|      23.2|
+------+-----+------------------+----------+



#### Arrange passenger class by total fare in ascending

In [147]:
gdf2_spark.agg({'Fare':'Sum'}).sort('sum(Fare)',ascending=True).show()

+------+---------+
|PClass|sum(Fare)|
+------+---------+
|     3|     23.2|
|     1|    124.4|
+------+---------+



#### Get Count of distinct Passenger classes

In [157]:
df2_spark.select('PClass').distinct().count()

2

In [168]:
df2_spark.select('Pclass').distinct().show()

+------+
|Pclass|
+------+
|     1|
|     3|
+------+



#### Get Count of 'Distinct' Passenger Ids by Sex

In [169]:
gdf1_spark = df1_spark_2.groupby('Sex')

####  Like F.countDistinct, F.sum, F.avg, F.max, etc sql functions can be used. 

In [170]:
gdf1_spark.agg({'*':'Count'}).show()a

+------+--------+
|   Sex|count(1)|
+------+--------+
|Female|       3|
|  Male|       2|
+------+--------+



In [174]:
gdf1_spark.agg(F.countDistinct('PassengerId')).show()

+------+---------------------------+
|   Sex|count(DISTINCT PassengerId)|
+------+---------------------------+
|Female|                          3|
|  Male|                          2|
+------+---------------------------+



In [178]:
gdf2_spark.agg(F.sum('Fare')).show()

+------+---------+
|PClass|sum(Fare)|
+------+---------+
|     1|    124.4|
|     3|     23.2|
+------+---------+



### Joins 

#### There are two ways to combine dataframes --- joins and unions. The idea here is the same as joining and unioning tables in SQL

#### Join the two titanic dataframes by the column PassengerId

In [179]:
df1_spark.join(df2_spark,['PassengerId']).show()

+-----------+--------+---+--------+---+----+------+
|PassengerId|    Name|Sex|Survived|Age|Fare|PClass|
+-----------+--------+---+--------+---+----+------+
|          1|    Owen|  M|       0| 22| 7.3|     3|
|          3|   Laina|  F|       1| 26| 7.9|     3|
|          5| William|  M|       0| 35| 8.0|     3|
|          4|    Lily|  F|       1| 35|53.1|     1|
|          2|Florence|  F|       1| 38|71.3|     1|
+-----------+--------+---+--------+---+----+------+



In [182]:
df1_spark.join(df2_spark,trim(df1_spark['PassengerID'])==trim(df2_spark['PassengerID']),'left_outer').show()

+-----------+--------+---+--------+-----------+---+----+------+
|PassengerId|    Name|Sex|Survived|PassengerId|Age|Fare|PClass|
+-----------+--------+---+--------+-----------+---+----+------+
|          3|   Laina|  F|       1|          3| 26| 7.9|     3|
|          5| William|  M|       0|          5| 35| 8.0|     3|
|          1|    Owen|  M|       0|          1| 22| 7.3|     3|
|          4|    Lily|  F|       1|          4| 35|53.1|     1|
|          2|Florence|  F|       1|          2| 38|71.3|     1|
+-----------+--------+---+--------+-----------+---+----+------+



#### Note : Joins using condition create duplicate columns with same name; it creates problem later while storing back to hadoop database , etc hence better to drop one of it

In [184]:
df1_spark.join(df2_spark, trim(df1_spark['PassengerId']) == trim(df2_spark['PassengerId']), 'left_outer').drop(df2_spark['PassengerId']).show()

+-----------+--------+---+--------+---+----+------+
|PassengerId|    Name|Sex|Survived|Age|Fare|PClass|
+-----------+--------+---+--------+---+----+------+
|          3|   Laina|  F|       1| 26| 7.9|     3|
|          5| William|  M|       0| 35| 8.0|     3|
|          1|    Owen|  M|       0| 22| 7.3|     3|
|          4|    Lily|  F|       1| 35|53.1|     1|
|          2|Florence|  F|       1| 38|71.3|     1|
+-----------+--------+---+--------+---+----+------+



### Nonequi joins - with conditions like >,>=,<,<=

In [185]:
df1_spark.join(df2_spark,trim(df1_spark['PassengerId'])>trim(df2_spark['PassengerId'])).show()

+-----------+--------+---+--------+-----------+---+----+------+
|PassengerId|    Name|Sex|Survived|PassengerId|Age|Fare|PClass|
+-----------+--------+---+--------+-----------+---+----+------+
|          2|Florence|  F|       1|          1| 22| 7.3|     3|
|          3|   Laina|  F|       1|          1| 22| 7.3|     3|
|          3|   Laina|  F|       1|          2| 38|71.3|     1|
|          4|    Lily|  F|       1|          1| 22| 7.3|     3|
|          5| William|  M|       0|          1| 22| 7.3|     3|
|          4|    Lily|  F|       1|          2| 38|71.3|     1|
|          5| William|  M|       0|          2| 38|71.3|     1|
|          4|    Lily|  F|       1|          3| 26| 7.9|     3|
|          5| William|  M|       0|          3| 26| 7.9|     3|
|          5| William|  M|       0|          4| 35|53.1|     1|
+-----------+--------+---+--------+-----------+---+----+------+



### Unions

In [186]:
df1_spark.union(df2_spark).show()

+-----------+--------+----+--------+
|PassengerId|    Name| Sex|Survived|
+-----------+--------+----+--------+
|          1|    Owen|   M|       0|
|          2|Florence|   F|       1|
|          3|   Laina|   F|       1|
|          4|    Lily|   F|       1|
|          5| William|   M|       0|
|          1|      22| 7.3|       3|
|          2|      38|71.3|       1|
|          3|      26| 7.9|       3|
|          4|      35|53.1|       1|
|          5|      35| 8.0|       3|
+-----------+--------+----+--------+



### Transform A DataFrame Column (using UDF)

In [187]:
from pyspark.sql.functions import udf

derive_perc = udf(lambda x: '$'+str(x*100))

df2_spark_1 = df2_spark.withColumn('fare_3_times',derive_perc(df2_spark['Fare']))

df2_spark_1.show()

+-----------+---+----+------+------------+
|PassengerId|Age|Fare|PClass|fare_3_times|
+-----------+---+----+------+------------+
|          1| 22| 7.3|     3|      $730.0|
|          2| 38|71.3|     1|     $7130.0|
|          3| 26| 7.9|     3|      $790.0|
|          4| 35|53.1|     1|     $5310.0|
|          5| 35| 8.0|     3|      $800.0|
+-----------+---+----+------+------------+



### Using SparkSQL

---- yet to implement in my notebook ----