# Set-up Environment

## Find Python Version in the System

In [3]:
#Documentation -- https://spark.apache.org/docs/1.6.1/sql-programming-guide.html
import platform
platform.python_version()

'2.7.5'

## Set Environment Variables

In [4]:
import os
import sys

###os.environ['SPARK_PREPEND_CLASSES']='/opt/sqljdbc_4.0/enu/sqljdbc4.jar'
os.environ['SPARK_CLASSPATH']='/opt/sqljdbc_4.0/enu/sqljdbc4.jar'
os.environ['SPARK_HOME']='/usr/hdp/2.4.2.0-258/spark'

import findspark
findspark.init()

## Include ALL packages to be imported to initialize and start pyspark

In [5]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.11:1.2.0 pyspark-shell'

## Import pyspark module

## Import additional modules within pyspark

In [6]:
import py4j
import pyspark
from pyspark.sql import SQLContext, Row
from pyspark import SparkConf, SparkContext

## Run Spark in various modes

## Documentation: http://spark.apache.org/docs/latest/submitting-applications.html

### Run Spark locally (local system = Head node )
### local[32] - Run on 32 cores (on the Head node)
### local[*] - Run on all available cores (on the Head node)

### sc is the spark context is the entry point for Spark's interpreter

In [10]:
conf = SparkConf().setMaster("local[*]").setAppName("My App")
sc = SparkContext(conf = conf)
#Check spark context version
sc.version

u'1.6.1'

## Create SQL context using Spark context

In [12]:
sqlctx = SQLContext(sc)

## Create Hive Context

### HiveContext is a super set of SQLContext that allows you to interact with Hive using SQL and Hive QL queries

In [13]:
from pyspark.sql import HiveContext

hsqlctx = HiveContext(sc)

## Functions offered by pyspark to manipulate data

### All of these functions behave similar to their Pandas counterparts

### Documentation: https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [14]:
import pyspark.sql.functions as F

# Reading data from a CSV file

## Reading CSV using textFile function in Spark

## Reading files using textFile function in Spark returns RDD (Resilient Distributed Dataset)

In [15]:
Employee_rdd = sc.textFile("/user/data/state13.csv")

# use map func to split each line in csv file using comma

Employee_rdd2 = Employee_rdd.map(lambda line: line.split(','))

# The csv file has a header. The below code gets the header

header = Employee_rdd2.first()

# Removes header from file from the RDD

Employee_rdd3 = Employee_rdd2.filter(lambda line: line!= header)

# take returns a list of lists [['','',''...],[''],['']]  
# take(1) returns the first row and [0] returns the first element which is the header 

Employee_rdd2.take(1)[0]

# Converting RDD to DataFrame

Employee_df = Employee_rdd3.toDF(header)

## Reading CSV using utilities like Databricks

### Reading files using Databricks utility in Spark returns a Dataframe and it can be done using either one: SQLContext or HiveContext

In [55]:
# using HiveContext
df1 = hsqlctx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/user/data/state13.csv')

# using SQLContext
df2 = sqlctx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/user/data/state13.csv')

# Subsetting the data - select only  a few columns

## Use the df.select( ) function and pass the list of columns to be displayed

In [16]:
Employee_df.select(['TimeZone', 'Latitude']).head(10)

[Row(TimeZone=u'Eastern', Latitude=u'30.869486'),
 Row(TimeZone=u'Eastern', Latitude=u'33.030828'),
 Row(TimeZone=u'Eastern', Latitude=u'33.750591'),
 Row(TimeZone=u'Eastern', Latitude=u'34.034049'),
 Row(TimeZone=u'Eastern', Latitude=u'32.44509'),
 Row(TimeZone=u'Eastern', Latitude=u'30.869486'),
 Row(TimeZone=u'Eastern', Latitude=u'34.254602'),
 Row(TimeZone=u'Eastern', Latitude=u'33.777293'),
 Row(TimeZone=u'Eastern', Latitude=u'32.7921'),
 Row(TimeZone=u'Eastern', Latitude=u'34.177555')]

In [17]:
Employee_df.select('CityName').distinct().collect()

[Row(CityName=u'Fitzgerald'),
 Row(CityName=u'Thomson'),
 Row(CityName=u'Alto'),
 Row(CityName=u'Sargent'),
 Row(CityName=u'Alpharetta'),
 Row(CityName=u'Jeffersonville'),
 Row(CityName=u'Folkston'),
 Row(CityName=u'Moreland'),
 Row(CityName=u'Broxton'),
 Row(CityName=u'Crawfordville'),
 Row(CityName=u'Georgetown'),
 Row(CityName=u'Montezuma'),
 Row(CityName=u'Marble Hill'),
 Row(CityName=u'Lagrange'),
 Row(CityName=u'Commerce'),
 Row(CityName=u'Young Harris'),
 Row(CityName=u'Springfield'),
 Row(CityName=u'Riceboro'),
 Row(CityName=u'Oakwood'),
 Row(CityName=u'Wrens'),
 Row(CityName=u'Hardwick'),
 Row(CityName=u'Morganton'),
 Row(CityName=u'Funston'),
 Row(CityName=u'Offerman'),
 Row(CityName=u'Grantville'),
 Row(CityName=u'Molena'),
 Row(CityName=u'Kingston'),
 Row(CityName=u'Mansfield'),
 Row(CityName=u'Buena Vista'),
 Row(CityName=u'Hiawassee'),
 Row(CityName=u'Shannon'),
 Row(CityName=u'Ray City'),
 Row(CityName=u'Bowdon Junction'),
 Row(CityName=u'Patterson'),
 Row(CityName=u'Roy

# View dataframe

### df.show(n), df.take(n), df.head(n) - displays first n records 

### df.collect( ) - Displays whole dataframe

### Dataframe API has no function tail( ), last( ) to get the last few rows

# Lazy Evaluation in Spark

All Spark operations can be classified into two categories: (1) Transformations (2) Actions

Spark creates a DAG (Direct Acyclic Graph) for all "Transformation" operations and it executes (all the operations from the beginning) once an "Action" operation is invoked, hence the term "Lazy Evaluation".

### What are some examples of transformation operations ?

Reading data from csv or Hive to create Resilient Distributed Dataset (RDD) or DataFrame (DF).
   
Creating new columns in RDD or DF.
   
Any operation that does not return results to the user.
    
### What are some examples of action operations? 

Calling head( ), take( ), or collect( ) on a dataframe or RDD.
   
### NOTE: Don't be surprised if none of the commands throw errors or exceptions until head( ) has been called on the result.  Only then does the real execution takes place and errors revealed.

## Reading data from Hive tables using Hive Queries
### Use hsqlctx.sql(query) function

In [19]:
df = hsqlctx.sql('select * from redcrossteam2.all_states_2000_2016')

## Aggregation of data

Similar to Pandas aggregation. Use groupBy or groupby functions in dataframe. Note that groupBy (with a capital B) and groupby are aliases of each other. 

Refer to the documentation for more details: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

## Pro Tip for Aggregation

Always use .agg( ) function along with groupby or groupBy. Why?  It gives more flexibility. 
It gives the capability to mimic the SQL groupby functionality including renaming the aggregate columns and multiple aggregations on one column.

For example,

consider the SQL statement

select count(*) as num_states, min(donation_dt) as min_don_dt,max(donation_dt) as max_don_dt from table_name group by            StateName;

What does this SQL do? 
It retrieves the number of records, maximum donation date, and minimum donation date for each state
Notice that donation_dt is used in 2 different aggregate functions and selected as 2 different aggregate colummns

This can be translated into a df group by function as shown below
    
df.groupBy('StateName').agg({'StateName' : {'num_states' : 'count'},'donation_dt':  {'min_donation_dt' : 'min' , 'max_donation_dt' : 'max'}}).collect()



In [25]:
df.head(2)

[Row(arc_id=65264167, donation_dt=datetime.date(2005, 2, 10), prodctv_proc_ind=1, first_donat_ind=0, deferral_ind=0, donation_ind=1, sponsor_name=u'University of Georgia Hill Community', sponsor_category=u'EDUCATION', site_zip=30609, donation_type=u'Red Cell Apheresis', ziptype=u'U', cityname=u'Athens', statename=u'Georgia'),
 Row(arc_id=84098182, donation_dt=datetime.date(2008, 3, 20), prodctv_proc_ind=1, first_donat_ind=1, deferral_ind=0, donation_ind=1, sponsor_name=u'Shiloh High School', sponsor_category=u'EDUCATION', site_zip=30039, donation_type=u'Whole Blood', ziptype=u'S', cityname=u'Snellville', statename=u'Georgia')]

In [None]:

df.groupBy('statename').agg({'donation_dt' :'min','statename' : 'count','donation_dt' :'max'}).collect()


In [None]:
##df.groupBy('statename').agg({'statename' : {'num_states' : 'count'},'donation_dt':  {'min_donation_dt' : 'min' , 'max_donation_dt' : 'max'}}).collect()

## Joins

use df.join( ) function 

## Pro Tip for Join

Always create aliases for dataframes being joined using df.alias( ) function before the join operation. eg., left_df = df1.alias('left_df').

Use aliases in the join operation. 
join() function has to be called using a dataframe object and takes 3 arguments, the second dataframe to be joined, join condition and the type of join to be performed.

For multiple joins, use a list of conditions. for eg., cond = [left_df.col1 == right_df.colA, left_df.col2 == right_df.colB,..]

Example of a join syntax is below.

In [43]:
# creating aliases
left_df = df1.alias('left_df')
right_df =df2.alias('right_df')

# join conditions
cond = [left_df.arc_id == right_df.arc_id]

# join function
df_joined = left_df.join(right_df,cond,'inner').select([F.col("left_df.arc_id"),F.col("right_df.bzd_assessedhomevalue")]).collect()

## Selecting all columns from one dataframe and a few columns from another dataframe in a join

In [46]:
# creating aliases
left_df = df1.alias('left_df')
right_df = df2.alias('right_df')

# join conditions
cond = [left_df.arc_id == right_df.arc_id]

# list of columns to be selected from left and right dataframe being joined
left_cols_to_select = [F.col('left_df.'+a) for a in left_df.columns]
right_cols_to_select = [F.col('right_df.bzd_assessedhomevalue'),F.col('right_df.bzd_avg_inq_all12')]

# join function
df_joined = left_df.join(right_df,cond,'inner').select(left_cols_to_select+right_cols_to_select)

## Query dataframes using SQL-like syntax


Spark Dataframes/RDDs can be accessed using a SQL-like Syntax. There are 2 steps:

1) Register the Dataframe as temp table using SQLContext(df,'tableName')

2) Use SQLContext.sql( ) function to query it like a normal table


In case of RDDs, 

1) Register the RDD as a temptable using the function RDDname.registerTempTable()

2) Use SQLContext.sql( ) function to query it like a normal table


Example below.

In [54]:
# Register as a temp table
sqlctx.registerDataFrameAsTable(df_joined,"dfJoinedTable")

# Use SQL query
sqlctx.sql("select zip4, min(birth_dt) as MinD, max(birth_dt) as MaxD,count(zip4) as num from dfJoinedTable group by zip4").take(100)

[Row(zip4=7138, MinD=u'1973/08/11', MaxD=u'1973/08/11', num=1),
 Row(zip4=3546, MinD=u'1984/11/24', MaxD=u'1984/11/24', num=1),
 Row(zip4=2357, MinD=u'1976/02/01', MaxD=u'1976/02/01', num=1),
 Row(zip4=2165, MinD=u'1989/02/27', MaxD=u'1989/02/27', num=1),
 Row(zip4=1801, MinD=u'1989/03/03', MaxD=u'1989/03/03', num=1),
 Row(zip4=3406, MinD=u'1972/01/19', MaxD=u'1972/01/19', num=1),
 Row(zip4=2420, MinD=u'1982/05/07', MaxD=u'1982/05/07', num=1),
 Row(zip4=1224, MinD=u'1985/08/30', MaxD=u'1985/08/30', num=1),
 Row(zip4=2224, MinD=u'1984/03/19', MaxD=u'1984/03/19', num=1)]

# Creating a new column in a Spark Dataframe

Use df.withColumn( ) function. 
withColumn( ) takes 2 arguments - the new column name and value to be filled in that column.

A new column can be created based on other columns in the dataframe based on a business logic. In this case, do the following:

1) write the business logic in a function

2) convert to User Defined Function (UDF)

3) pass the UDF as an argument to withColumn( ) function


In [None]:
# Define the business logic as a function (date conversion in this case)

from datetime import datetime
def convDateFormat(x):
    return datetime.strptime(x, '%Y/%m/%d')

# Create a UDF
from pyspark.sql.types import DateType
func =  F.udf (convDateFormat, DateType())

# use withColumn() to create a new column
df1 = df1.withColumn('birth_dt_test', func(F.col('birth_dt')))

## Note: 
Here is the "functional programming" way of converting date

func =  udf ( lambda x: datetime.strptime ( x, '%Y/%m/%d'), DateType ( ) )



## Load a csv file into Hive using pyspark

csv files can be loaded into Hive using pyspark HiveContext without even creating the table structure before hand. The steps are:

1) Read csv into a Dataframe or RDD
    
2) Register the Dataframe or RDD as temp table using appropriate function (registerTempTable( ) for RDD and registerDataFrameAsTable( ) for df)
    
3) Load into hive by running the SQL syntax "create table TableName as select * from TempTableName" in HiveContext

Example below.

In [65]:
# read csv file into dataframe
df = hsqlctx.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/user/data/donor_summary_sample.csv')

# register as temp table  (USING HIVECONTEXT! DONT USE SQLCONTEXT TO REGISTER AS TEMP TABLE)
hsqlctx.registerDataFrameAsTable(df,"donor_summary_tmp")

# Load into Hive creating a new table on-the-fly
hsqlctx.sql("drop table donor_summary_sample")
hsqlctx.sql("create table donor_summary_sample as select * from donor_summary_tmp")


DataFrame[]

## Bring data into memory

Most of the times, the data has to be brought to memory for convenience (for eg., to visualize or to plot)

Although pyspark supports plotting, it is generally preferred to have the data to be plotted in memory and use existing plotting libraries available.

The data stored in spark dataframes can be brought into memory as a Pandas dataframe by calling toPandas( ) on the Spark dataframe as shown below.

### Caution: Make sure that the data fits into memory before it can be brought into Pandas

In [None]:
pd_df = df1.toPandas()

## Stop Spark context 

In [48]:
sc.stop()

## Fun exercise

Mimic the SQL statement in pyspark:

select round( datediff ( donation_dt, birth_dt) / 365) as age, count(*) from all_states1_2000_2016 where round( datediff( donation_dt, birth_dt) / 365) >= 0 group by round ( datediff(donation_dt, birth_dt)/  365) order by age;

In [None]:
#read schema from hive table
df_all_states_h = hsqlctx.sql("select * from all_states1 limit 1")

#reading from Parquet file
df_all_states = sqlctx.read.parquet('/user/data/all_states')

##Rename columns of dataframe read from Parquet
for i in range(len(df_all_states_h.columns)):
    df_all_states = df_all_states.withColumnRenamed(df_all_states.columns[i],df_all_states_h.columns[i])
    
## Create age column from birth_dt and donation_dt

df_all_states = df_all_states.withColumn('age',F.round(F.datediff(df_all_states.donation_dt,df_all_states.birth_dt)/365))

##apply filter
df_all_states_2000_2016 = df_all_states.where((F.year(df_all_states.donation_dt) >=2000) & (F.year(df_all_states.donation_dt) <=2016) &(df_all_states.age>=0))

#group by age
df_all_states_2000_2016.groupby('age').agg({'arc_id':'count'}).sort('age').collect()
