## DataFrames For DataScientists
1. SparkContext()
1. Read/Write
1. Convert
1. Columns & Rows
1. DataFrame : RDD-like Operations
1. DataFrame : Action
1. DataFrame : Scientific Functions
1. DataFrame : Statistical Functions
1. DataFrame : Aggregate Functions
1. DataFrame : na
1. DataFrame : Joins, Set Operations
1. DataFrame : Tables & SQL

## 1. SparkContext()

In [None]:
import datetime
from pytz import timezone
print "Last run @%s" % (datetime.datetime.now(timezone('US/Pacific')))

In [None]:
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)

In [None]:
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()

In [None]:
sqlCxt = pyspark.sql.SQLContext(sc)

## 2. Read/Write

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('spark-csv/cars.csv')
df.coalesce(1).select('year', 'model').write.format('com.databricks.spark.csv').save('newcars.csv')

In [None]:
df.show()

In [None]:
df_cars = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('car-data/car-milage.csv')

In [None]:
df_cars_x = sqlContext.read.load('cars_1.parquet')
df_cars_x.dtypes

In [None]:
df_cars.show(40)

In [None]:
df_cars.describe().show()

In [None]:
df_cars.describe(["mpg",'hp']).show()

In [None]:
df_cars.groupby("automatic").avg("mpg")

In [None]:
df_cars.na.drop('any').count()

In [None]:
df_cars.count()

In [None]:
df_cars.dtypes

In [None]:
df_2 = df_cars.select(df_cars.mpg.cast("double").alias('mpg'),df_cars.torque.cast("double").alias('torque'),
                     df_cars.automatic.cast("integer").alias('automatic'))

In [None]:
df_2.show(40)

In [None]:
df_2.dtypes

In [None]:
df_2.describe().show()

## 9. DataFrame : Aggregate Functions

In [None]:
df_2.groupby("automatic").avg("mpg","torque").show()

In [None]:
df_2.groupBy().avg("mpg","torque").show()

In [None]:
df_2.agg({"*":"count"}).show()

In [None]:
import pyspark.sql.functions as F
df_2.agg(F.min(df_2.mpg)).show()

In [None]:
import pyspark.sql.functions as F
df_2.agg(F.mean(df_2.mpg)).show()

In [None]:
gdf_2 = df_2.groupBy("automatic")
gdf_2.agg({'mpg':'min'}).collect()
gdf_2.agg({'mpg':'min'}).show()

In [None]:
df_cars_1 = df_cars.select(df_cars.mpg.cast("double").alias('mpg'),
                           df_cars.displacement.cast("double").alias('displacement'),
                           df_cars.hp.cast("integer").alias('hp'),
                           df_cars.torque.cast("integer").alias('torque'),
                           df_cars.CRatio.cast("float").alias('CRatio'),
                           df_cars.RARatio.cast("float").alias('RARatio'),
                           df_cars.CarbBarrells.cast("integer").alias('CarbBarrells'),
                           df_cars.NoOfSpeed.cast("integer").alias('NoOfSpeed'),
                           df_cars.length.cast("float").alias('length'),
                           df_cars.width.cast("float").alias('width'),
                           df_cars.weight.cast("integer").alias('weight'),
                           df_cars.automatic.cast("integer").alias('automatic'))

In [None]:
gdf_3 = df_cars_1.groupBy("automatic")
gdf_3.agg({'mpg':'mean'}).show()

In [None]:
df_cars_1.avg("mpg","torque").show()

In [None]:
df_cars_1.groupBy().avg("mpg","torque").show()

In [None]:
df_cars_1.groupby("automatic").avg("mpg","torque").show()

In [None]:
df_cars_1.groupby("automatic").avg("mpg","torque","hp","weight").show()

In [None]:
df_cars_1.printSchema()

In [None]:
df_cars_1.show(5)

In [None]:
df_cars_1.describe().show()

In [None]:
df_cars_1.groupBy().agg({"mpg":"mean"}).show()

In [None]:
df_cars_1.show(40)

## 8. DataFrame : Statistical Functions

In [None]:
df_cars_1.corr('hp','weight')

In [None]:
df_cars_1.corr('RARatio','width')

In [None]:
df_cars_1.crosstab('automatic','NoOfSpeed').show()

In [None]:
df_cars_1.crosstab('NoOfSpeed','CarbBarrells').show()

In [None]:
df_cars_1.crosstab('automatic','CarbBarrells').show()

## 10. DataFrame : na

In [None]:
# We can see if a column has null values
df_cars_1.select(df_cars_1.torque.isNull()).show()

In [None]:
# We can filter null and non null rows
df_cars_1.filter(df_cars_1.torque.isNull()).show(40) # You can also use isNotNull

In [None]:
df_cars_1.na.drop().count()

In [None]:
df_cars_1.fillna(9999).show(50)
# This is not what we will do normally. Just to show the effect of fillna
# you can use df_cars_1.na.fill(9999)

In [None]:
# Let us try the interesting when syntax on the HP column
# 0-100=1,101-200=2,201-300=3,others=4
df_cars_1.select(df_cars_1.hp, F.when(df_cars_1.hp <= 100, 1).when(df_cars_1.hp <= 200, 2)
                 .when(df_cars_1.hp <= 300, 3).otherwise(4).alias("hpCode")).show(40)

In [None]:
df_cars_1.dtypes

In [None]:
df_cars_1.groupBy('CarbBarrells').count().show()

In [None]:
# If file exists, will give error
# java.lang.RuntimeException: path file:.. /cars_1.parquet already exists.
#
df_cars_1.repartition(1).write.save("cars_1.parquet", format="parquet")

In [None]:
# No error even if the file exists
df_cars_1.repartition(1).write.mode("overwrite").format("parquet").save("cars_1.parquet")
# Use repartition if you want all data in one (or more) file 

In [None]:
# Appends to existing file
df_cars_1.repartition(1).write.mode("append").format("parquet").save("cars_1_a.parquet")
# Even with repartition, you will see more files as it is append

In [None]:
df_append = sqlContext.load("cars_1_a.parquet")
# sqlContext.load is deprecated

In [None]:
df_append.count()

In [None]:
#eventhough parquet is the default format, explicit format("parquet") is clearer
df_append = sqlContext.read.format("parquet").load("cars_1_a.parquet")
df_append.count()

In [None]:
# for reading parquet files read.parquet is more elegant
df_append = sqlContext.read.parquet("cars_1_a.parquet")
df_append.count()

In [None]:
# Let us read another file
df_orders = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('NW/NW-Orders.csv')

In [None]:
df_orders.head()

In [None]:
df_orders.dtypes

In [None]:
from pyspark.sql.types import StringType, IntegerType,DateType
getYear = F.udf(lambda s: s[-2:], StringType()) #IntegerType())
from datetime import datetime
convertToDate = F.udf(lambda s: datetime.strptime(s, '%m/%d/%y'),DateType())

In [None]:
# You could register the function for sql as follows. We won't use this here
sqlContext.registerFunction("getYear", lambda s: s[-2:])

In [None]:
# let us add an year column
df_orders.select(df_orders['OrderID'], 
                 df_orders['CustomerID'],
                 df_orders['EmpliyeeID'], 
                 df_orders['OrderDate'],
                 df_orders['ShipCuntry'].alias('ShipCountry'),
                 getYear(df_orders['OrderDate'])).show()

In [None]:
# let us add an year column
# Need alias
df_orders_1 = df_orders.select(df_orders['OrderID'], 
                 df_orders['CustomerID'],
                 df_orders['EmpliyeeID'], 
                 convertToDate(df_orders['OrderDate']).alias('OrderDate'),
                 df_orders['ShipCuntry'].alias('ShipCountry'),
                 getYear(df_orders['OrderDate']).alias('Year'))
# df_orders_1 = df_orders_x.withColumn('Year',getYear(df_orders_x['OrderDate'])) # doesn't work. Gives error

In [None]:
df_orders_1.show(1)

In [None]:
df_orders_1.dtypes

In [None]:
df_orders_1.show()

In [None]:
df_orders_1.where(df_orders_1['ShipCountry'] == 'France').show()

In [None]:
df_orders_1.groupBy("CustomerID","Year").count().orderBy('count',ascending=False).show()

In [None]:
df_orders_1.groupBy("CustomerID","Year").count().orderBy('count',ascending=False).show()

In [None]:
# save by partition (year)
df_orders_1.write.mode("overwrite").partitionBy("Year").format("parquet").save("orders_1.parquet")
# load defaults to parquet

In [None]:
df_orders_2 = sqlContext.read.parquet("orders_1.parquet")
df_orders_2.explain(True)
df_orders_3 = df_orders_2.filter(df_orders_2.Year=='96')
df_orders_3.explain(True)

In [None]:
df_orders_3.count()

In [None]:
df_orders_3.explain(True)

In [None]:
df_orders_2.count()

In [None]:
df_orders_1.printSchema()

## 7. DataFrame : Scientific Functions

In [None]:
# import pyspark.sql.Row
df = sc.parallelize([10,100,1000]).map(lambda x: {"num":x}).toDF()

In [None]:
df.show()

In [None]:
import pyspark.sql.functions as F
df.select(F.log(df.num)).show()

In [None]:
df.select(F.log10(df.num)).show()

In [None]:
df = sc.parallelize([0,10,100,1000]).map(lambda x: {"num":x}).toDF()

In [None]:
df.show()

In [None]:
df.select(F.log(df.num)).show()

In [None]:
df.select(F.log1p(df.num)).show()

In [None]:
df_cars_1.select(df_cars_1['CarbBarrells'], F.sqrt(df_cars_1['mpg'])).show()

In [None]:
df = sc.parallelize([(3,4),(5,12),(7,24),(9,40),(11,60),(13,84)]).map(lambda x: {"a":x[0],"b":x[1]}).toDF()

In [None]:
df.show()

In [None]:
df.select(df['a'],df['b'],F.hypot(df['a'],df['b']).alias('hypot')).show()

## 11. DataFrame : Joins, Set Operations

In [None]:
df_a = sc.parallelize( [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}] ).toDF()
df_b = sc.parallelize( [{"X1":"A","X3":True},{"X1":"B","X3":False},{"X1":"D","X3":True}] ).toDF()

In [None]:
df_a.show()

In [None]:
df_b.show()

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'inner')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'full' or 'fullouter'
# Spark doesn't merge the key columns and so need to alias the column names to distinguih between the columns

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'left_outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'left'

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'right_outer')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show() # same as 'right'

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'right')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'full')\
.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()# same as 'fullouter'

In [None]:
df_a.join(df_b, df_a['X1'] == df_b['X1'], 'leftsemi').show() # same as semijoin
#.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()

In [None]:
#anti-join = df.subtract('leftsemi')
df_a.subtract(df_a.join(df_b, df_a['X1'] == df_b['X1'], 'leftsemi')).show() 
#.select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()

In [None]:
c = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}]
d = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"D","X2":4}]
df_c = sc.parallelize(c).toDF()
df_d = sc.parallelize(d).toDF()

In [None]:
df_c.show()

In [None]:
df_d.show()

In [None]:
df_c.intersect(df_d).show()

In [None]:
df_c.subtract(df_d).show()

In [None]:
df_d.subtract(df_c).show()

In [None]:
e = [{"X1":"A","X2":1},{"X1":"B","X2":2},{"X1":"C","X2":3}]
f = [{"X1":"D","X2":4},{"X1":"E","X2":5},{"X1":"F","X2":6}]
df_e = sc.parallelize(e).toDF()
df_f = sc.parallelize(f).toDF()

In [None]:
df_e.unionAll(df_f).show()

In [None]:
# df_a.join(df_b, df_a['X1'] == df_b['X1'], 'semijoin')\
# .select(df_a['X1'].alias('X1-a'),df_b['X1'].alias('X1-b'),'X2','X3').show()
# Gives error Unsupported join type 'semijoin'.
# Supported join types include: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 
# 'right', 'leftsemi'

## 12. DataFrame : Tables & SQL

In [None]:
# SQL on tables

In [None]:
df_a.registerTempTable("tableA")
sqlContext.sql("select * from tableA").show()

In [None]:
df_b.registerTempTable("tableB")
sqlContext.sql("select * from tableB").show()

In [None]:
sqlContext.sql("select * from tableA JOIN tableB on tableA.X1 = tableB.X1").show()

In [None]:
sqlContext.sql("select * from tableA LEFT JOIN tableB on tableA.X1 = tableB.X1").show()

In [None]:
sqlContext.sql("select * from tableA FULL JOIN tableB on tableA.X1 = tableB.X1").show()

### _That's All, Folks !_