# Data Access using SparkSQL and Dataframe

## Activity : Projection and Selection Queries

In this module, you will practice how to write codes to retrieve data using Spark SQL and Dataframes API.

The complete list of Dataframe functions can be accessed from [here](https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/sql/DataFrame.html), [here](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join) and [here](https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.functions$)


In this activity, we will use HR schema as shown below
![hr](HR.gif)

### INITIALIZATION
The first section of this scipt is the initialization section. 
In this section, we are preparing Spark environment to recognize and process SQL statements.

In [1]:
from pyspark import SparkContext, SparkConf # Spark
from pyspark.sql import SparkSession # Spark SQL
from pyspark.sql.types import *

#additional 
from pyspark.sql.functions import *

sc = SparkContext.getOrCreate()

# local[*]: run Spark locally with as many working processors as logical cores on your machine.
# In the field of `master`, we use a local server with as many working processors (or threads) as possible (i.e. `local[*]`). 
# If we want Spark to run locally with 'k' worker threads, we can specify as `local[k]`.
# The `appName` field is a name to be shown on the Sparking cluster UI. 

# If there is no existing spark context, we now create a new context
if (sc is None):
    sc = SparkContext(master="local[3]", appName="Introduction to Apache Spark")
spark = SparkSession(sparkContext=sc)


## DATA STRUCTURE DEFINITION

In this section, we are preparing the data structure to match the datafiles provided as the datasources

In [2]:
#COUNTRIES TABLE
scCountries = StructType([StructField("country_id",StringType()),StructField("country_name",StringType()),StructField("region_id",IntegerType())])

#DEPARTMENTS TABLE
scDepartments = StructType([StructField("department_id",IntegerType()),
StructField("department_name",StringType()),
StructField("manager_id",IntegerType()),
StructField("location_id",IntegerType())
])

#EMPLOYEES TABLE
scEmployees = StructType([
StructField("employee_id",IntegerType()),
StructField("first_name",StringType()),
StructField("last_name",StringType()),
StructField("email",StringType()),
StructField("phone_number",StringType()),
StructField("hire_date",StringType()),
StructField("job_id",StringType()),
StructField("salary",IntegerType()),
StructField("commission_pct",FloatType()),
StructField("manager_id",IntegerType()),
StructField("department_id",IntegerType())
])

#JOBS TABLE
scJobs = StructType([
StructField("job_id",StringType()),
StructField("job_title",StringType()),
StructField("min_salary",IntegerType()),
StructField("max_salary",IntegerType())
])

#JOB_HISTORY TABLE
scJob_history = StructType([
StructField("employee_id",IntegerType()),
StructField("start_date",StringType()),
StructField("end_date",StringType()),
StructField("job_id",StringType()),
StructField("department_id",IntegerType())
])

#LOCATIONS TABLE
scLocations = StructType([
StructField("location_id",IntegerType()),
StructField("street_address",StringType()),
StructField("postal_code",StringType()),
StructField("city",StringType()),
StructField("state_province",StringType()),
StructField("country_id",StringType())
])

#REGIONS TABLE
scRegions = StructType([
StructField("region_id",IntegerType()),
StructField("region_name",StringType())
])

### DATA LOADING

In [3]:
#COUNTRIES DATA
dataCountries = sc.textFile('COUNTRIES.csv')
dataCountries = dataCountries.map(lambda x: x.split(','))
dataCountries = dataCountries.map(lambda x: [x[0],x[1], int(x[2])])

#DEPARTMENTS DATA
dataDepartments = sc.textFile('DEPARTMENTS.csv')
dataDepartments = dataDepartments.map(lambda x: x.split(','))
dataDepartments = dataDepartments.map(lambda x: [int(x[0]),x[1], int(x[2]), int(x[3])])

#EMPLOYEES DATA
dataEmployees = sc.textFile('EMPLOYEES.csv')
dataEmployees = dataEmployees.map(lambda x: x.split(','))
dataEmployees = dataEmployees.map(lambda x: [int(x[0]),x[1], x[2], \
                                             x[3],x[4], x[5], x[6], \
                                             int(x[7]),float(x[8]), int(x[9]), int(x[10])\
                                            ])

#JOBS_DATA
dataJobs = sc.textFile('JOBS.csv')
dataJobs = dataJobs.map(lambda x: x.split(','))
dataJobs = dataJobs.map(lambda x: [x[0],x[1], \
                                   int(x[2]),int(x[3])\
                                   ])

#JOB_HISTORY_DATA
dataJob_history = sc.textFile('JOB_HISTORY.csv')
dataJob_history = dataJob_history.map(lambda x: x.split(','))
dataJob_history = dataJob_history.map(lambda x: [int(x[0]),x[1], \
                                   x[2],x[3],int(x[4])\
                                   ])

#LOCATION_DATA
dataLocations = sc.textFile('LOCATIONS.csv')
dataLocations = dataLocations.map(lambda x: x.split(','))
dataLocations = dataLocations.map(lambda x: [int(x[0]),x[1], \
                                   x[2],x[3],x[4],x[5]\
                                   ])
#REGIONS DATA
dataRegions = sc.textFile('REGIONS.csv')
dataRegions = dataRegions.map(lambda x: x.split(','))
dataRegions = dataRegions.map(lambda x: [int(x[0]),x[1] ])


### PREPARING DATAFRAMES

In [4]:
dfCountries = spark.createDataFrame(dataCountries,schema=scCountries) 
dfCountries.createOrReplaceTempView("dataCountries")

dfDepartments = spark.createDataFrame(dataDepartments,schema=scDepartments) 
dfDepartments.createOrReplaceTempView("dataDepartments")

dfEmployees = spark.createDataFrame(dataEmployees,schema=scEmployees) 
dfEmployees.createOrReplaceTempView("dataEmployees")

dfJobs = spark.createDataFrame(dataJobs,schema=scJobs) 
dfJobs.createOrReplaceTempView("dataJobs")

dfJob_history = spark.createDataFrame(dataJob_history,schema=scJob_history) 
dfJob_history.createOrReplaceTempView("dataJob_history")

dfLocations = spark.createDataFrame(dataLocations,schema=scLocations) 
dfLocations.createOrReplaceTempView("dataLocations")

dfRegions = spark.createDataFrame(dataRegions,schema=scRegions) 
dfRegions.createOrReplaceTempView("dataRegions")


In [13]:
from kafka import KafkaConsumer
from json import loads
import matplotlib.pyplot as plt

# kafka consumer connect
consumer = KafkaConsumer(
    'dfTest',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my_group',
    value_deserializer=lambda x: loads(x.decode('utf-8')))

In [14]:
for message in consumer:
    message = message.value
    print(message['row'])

Diana Lorentz
Nancy Greenberg
Daniel Faviet
John Chen
Ismael Sciarra
Jose Manuel Urman
Luis Popp
Den Raphaely
Alexander Khoo
Shelli Baida
Sigal Tobias
Guy Himuro
Karen Colmenares
Matthew Weiss
Adam Fripp
Payam Kaufling
Shanta Vollman
Kevin Mourgos
Julia Nayer
Irene Mikkilineni
James Landry
Steven Markle
Laura Bissot
Mozhe Atkinson
James Marlow
TJ Olson
Jason Mallin
Michael Rogers
Ki Gee
Hazel Philtanker
Renske Ladwig
Stephen Stiles
John Seo
Joshua Patel
Trenna Rajs
Curtis Davies
Randall Matos
Peter Vargas
John Russell
Karen Partners
Alberto Errazuriz
Gerald Cambrault
Eleni Zlotkey
Peter Tucker
David Bernstein
Peter Hall
Christopher Olsen
Nanette Cambrault
Steven King
Neena Kochhar
Lex De Haan
Alexander Hunold
Bruce Ernst
David Austin


KeyboardInterrupt: 