<a href="https://colab.research.google.com/github/roitraining/SparkProgram/blob/Day2/Day2/Ch03_DataFrames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Set up the Spark environment.

In [None]:
import sys
sys.path.append('/home/student/ROI/SparkProgram')
from initspark import *
sc, spark, conf = initspark()

Turn a simple RDD into a DataFrame. 

In [None]:
x = sc.parallelize([(1,'alpha'),(2,'beta')])
x0 = spark.createDataFrame(x)
x0.show()

Give the DataFrame meaningful column names.

In [None]:
x1 = spark.createDataFrame(x, schema=['ID','Name'])
x1.show()
print(x1)

Give a DataFrame a schema with column names and data types.

In [None]:
x2 = spark.createDataFrame(x, 'ID:int, Name:string')
x2.show()
print(x2)

Load a text file into a RDD and clean it up as before.

In [None]:
filename = '/home/student/ROI/Spark/datasets/finance/CreditCard.csv'
cc = sc.textFile(filename)
first = cc.first()
cc = cc.filter(lambda x : x != first)
cc.take(10)


In [None]:
import datetime
cc = cc.map(lambda x : x.split(',')) 
cc.take(10)

In [None]:
cc = cc.map(lambda x : (x[0][1:], x[1][1:-1], datetime.datetime.strptime(x[2], '%d-%b-%y').date(), x[3], x[4], x[5], float(x[6])))
print (cc.collect())

Turn the RDD into a DataFrame.

In [None]:
df = spark.createDataFrame(cc)
df.show()

The built in toDF method does the same thing.

In [None]:
df = cc.toDF()
df.show()
print(df)

In [None]:
df = cc.toDF(['City', 'Country', 'Date', 'CardType', 'TranType', 'Gender', 'Amount'])
df.show()

In [None]:
df = cc.toDF('City: string, Country: string, Date: date, CardType: string, TranType: string, Gender: string, Amount: double')
df.show()
print(df)


**LAB:** Use the regions and territories RDDs from the previous lab and convert them into DataFrames with meaningful schemas.


In [None]:
regions = sc.textFile('hdfs://localhost:9000/regions')
regions = regions.map(lambda x : x.split(',')).map(lambda x : (int(x[0]), x[1]))

territories = sc.textFile('hdfs://localhost:9000/territories')
territories = territories.map(lambda x : x.split(',')).map(lambda x : (int(x[0]), x[1], int(x[2])))



Convert a DataFrame into a JSON string.

In [None]:
print (df.toJSON().take(10))

In [None]:
df.printSchema()
print (df.columns, df.count())

Choose particular columns from a DataFrame.

In [None]:
df.select('City', 'Country', 'Amount').show(10)

In [None]:
df.select('City', 'Country').distinct().show()

Sort a DataFrame. The sort and orderBy methods are different aliases for the exact same method.

In [None]:
df.sort(df.Amount).show()
df.sort(df.Amount, ascending = False).show()
df.select('City', 'Amount').orderBy(df.City).show()

Create a new DataFrame with a new calculated column added.

In [None]:
df2 = df.withColumn('Discount', df.Amount * .03)
df2.show()

Remove an unwanted column from a DataFrame.

In [None]:
df3 = df2.drop(df2.Country)
df3.show()

The filter and where methods can both be used and have alternative ways to represent the condition.

In [None]:
df3.filter(df3.Amount < 4000).show()
print(df3.filter('Amount < 4000').count())
print(df3.where('Amount < 4000').count())
print(df3.where(df3.Amount < 4000).count())

print (df3.where((df3.Amount > 3000) & (df3.Amount < 4000)).count())
print (df3.where('Amount > 3000 and Amount < 4000').count())

Load a CSV file directly into a DataFrame using alternate syntaxes.

**LAB:** Using the df3 DataFrame, answer the following questions:

How many Platinum card purchases were there with a discount above $100?

Find the ten biggest discount amounts earned by women and show just the purchase amount, discount, and date.

JOINs work as expected.

In [None]:
tab1 = sc.parallelize([(1, 'Alpha'), (2, 'Beta'), (3, 'Delta')]).toDF('ID:int, code:string')
tab2 = sc.parallelize([(100, 'One', 1), (101, 'Two', 2), (102, 'Three', 1), (103, 'Four', 4)]).toDF('ID:int, name:string, parentID:int')
tab1.join(tab2, tab1.ID == tab2.parentID).show()
tab1.join(tab2, tab1.ID == tab2.parentID, 'left').show()
tab1.join(tab2, tab1.ID == tab2.parentID, 'right').show()
tab1.join(tab2, tab1.ID == tab2.parentID, 'full').show()


Examples of aggregate functions.

In [None]:
tab3 = sc.parallelize([(1, 10), (1, 20), (1, 30), (2, 40), (2,50)]).toDF('groupID:int, amount:int')
tab3.groupby('groupID').max().show()
tab3.groupby('groupID').sum().show()
x = tab3.groupby('groupID')
x.agg({'amount':'sum', 'amount':'max'}).show()
from pyspark.sql import functions as F
x.agg(F.sum('amount'), F.max('amount')).show()

Examples of reading a CSV directly into a DataFrame.

In [None]:
filename = '/home/student/ROI/Spark/datasets/finance/CreditCard.csv'
df4 = spark.read.load(filename, format = 'csv', sep = ',', inferSchema = True, header = True)
df4.printSchema()

In [None]:
df4 = spark.read.format('csv').option('header','true').option('inferSchema','true').load(filename)
df4.printSchema()

In [None]:
df4 = spark.read.csv(filename, header = True, inferSchema = True)
df4.printSchema()

In [None]:
df4.show()

**LAB:** Read the Products file from the JSON folder and categories from ths CSVHeaders folder, then join them displaying just the product and category IDs and names, and sort by categoryID then productID. 

Hint: Drop the ambiguous column after the join.

Change the name of the City column to CityCountry.

In [None]:
cols = df4.columns
cols[0] = 'CityCountry'
df4 = df4.toDF(*cols)
df4.printSchema()

Apply a custom UDF to columns to separate the City and Country and convert the Date into a date datatype.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import to_date
import datetime

def city(x):
    return x[:x.find(',')]
def country(x):
    return x[x.find(',') + 1 :]

df5 = df4.withColumn('City', udf(city, StringType())(df4.CityCountry)) \
      .withColumn('Country', udf(country, StringType())(df4.CityCountry)) \
      .withColumn('Date', to_date(df4.Date, 'dd-MMM-yy')) \
      .drop(df4.CityCountry)
df5.show()

DataFrames can be written to a variety of file formats. Here we are writing it to JSON.

In [None]:
df5.write.json('/home/student/Spark/CreditCard.json')

Read a JSON file into a DataFrame, but note that we lose the datatypes.

In [None]:
df6 = spark.read.json('/home/student/Spark/CreditCard.json')
df6.printSchema()

Create a schema that can be used to import a file and directly name the columns and convert them to the desired data type.

In [None]:
schema = StructType([
    StructField('Date', DateType()), 
    StructField('Card Type', StringType()),
    StructField('Exp Type', StringType()),
    StructField('Gender', StringType()),
    StructField('Amount', FloatType()),
    StructField('City', StringType()),
    StructField('Country', StringType())
])
df6 = spark.read.json('/home/student/Spark/CreditCard.json', schema = schema)
df6.printSchema()