## Open a terminal window and run the following commands:
sudo bash
start-hadoop
cd /home/student/ROI/SparkProgram/Day3
./fixhive.sh


## Let's make a simple hive table for regions.

In [None]:
! cat /class/regions.hql
# hive -f regions.hql 
#! /class/fixhive.sh


In [1]:
import sys
sys.path.append('/class')
from initspark import *
sc, spark, conf = initspark()


initializing pyspark
pyspark initialized


## You can query an existing Hive table and bring it into a Spark DataFrame.

In [9]:
# CREATE EXTERNAL TABLE regions(regionid int, regionname string)
# ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
# LINES TERMINATED BY '\n' 
# STORED AS TEXTFILE
# LOCATION '/regions';
regions = spark.read.table('regions')

region_schema = StructType([
    StructField('regionid', IntegerType()), 
    StructField('regionname', StringType())
])

regions = spark.read.csv('hdfs://localhost:9000/regions', schema = region_schema, header = False, sep = ',')

                         
regions = spark.sql('select * from regions')
regions = spark.sql('select regionid, regionname from regions')

#regions = spark.sql('select * from northwind.regions')

regions = spark.sql('''
select * 
from northwind.regions
''')

regions.show()

query = '''
select * 
from northwind.regions
'''

display(spark.sql(query))

print(regions)


+--------+----------+
|regionid|regionname|
+--------+----------+
|       1|   Eastern|
|       2|   Western|
|       3|  Northern|
|       4|  Southern|
+--------+----------+



### Really the SQL is being converted to something like the following Spark commands.

In [None]:
regionschema = StructType([
    StructField('regionid', IntegerType()), 
    StructField('regionname', StringType())
])

regions2 = spark.read.csv('hdfs://localhost:9000/regions', header=False
                         , schema = regionschema, sep = ',')
display(regions2)

### You can install this magic add in which lets you call SparkSQL directly

In [11]:
# pip3 install sparksql-magic
%load_ext sparksql_magic

In [13]:
%%sparksql
select * from regions


0,1
RegionID,RegionName
1,Eastern
2,Western
3,Northern
4,Southern


### Can also capture the results to a variable and use it later

In [14]:
%%sparksql regions4
select regionname, regionid from regions where regionid < 4


capture dataframe to local variable `regions4`


0,1
regionname,regionid
Eastern,1
Western,2
Northern,3


In [None]:
regions4 = spark.sql('select regionname, regionid from regions where regionid < 4')

In [17]:
sort_field = 'regionname'
display(regions4.orderBy(sort_field))

Unnamed: 0,regionname,regionid
0,Eastern,1
1,Northern,3
2,Western,2


In [45]:
regions4 = spark.sql('select regionname, regionid from regions where regionid < 4 order by regionname')
regions4 = spark.sql('select regionname, regionid from regions where regionid < 4').sort('regionname')
regions4 = spark.sql('select regionname, regionid from regions').filter('regionid < 4').sort('regionname')


In [46]:
regionschema = StructType([
    StructField('regionid', IntegerType()), 
    StructField('regionname', StringType())
])

regions2 = (spark.read.csv('hdfs://localhost:9000/regions', header=False
                         , schema = regionschema, sep = ',')
                .select('regionname', 'regionid')
                .where('regionid < 4')
                .sort('regionid'))
display(regions2)

Unnamed: 0,regionname,regionid
0,Eastern,1
1,Western,2
2,Northern,3


In [None]:
regions.collect()

## Read in a file to a Spark DataFrame.

In [20]:
# spark.sql('select * from territories').show()

In [21]:
territories = spark.read.csv('file:///class/datasets/northwind/CSVHeaders/territories'
                             , inferSchema = True, header=True)
territories.show()


+-----------+-------------+--------+
|TerritoryID|TerritoryName|RegionID|
+-----------+-------------+--------+
|       1581|     Westboro|       1|
|       1730|      Bedford|       1|
|       1833|    Georgetow|       1|
|       2116|       Boston|       1|
|       2139|    Cambridge|       1|
|       2184|    Braintree|       1|
|       2903|   Providence|       1|
|       3049|       Hollis|       3|
|       3801|   Portsmouth|       3|
|       6897|       Wilton|       1|
|       7960|   Morristown|       1|
|       8837|       Edison|       1|
|      10019|     New York|       1|
|      10038|     New York|       1|
|      11747|     Mellvile|       1|
|      14450|     Fairport|       1|
|      19428| Philadelphia|       3|
|      19713|       Neward|       1|
|      20852|    Rockville|       1|
|      27403|   Greensboro|       1|
+-----------+-------------+--------+
only showing top 20 rows



## Use createOrReplaceTempView to create a virtual table in the Hive catalog and then it can be queried using SQL as if it were a hive table.

In [22]:
display(territories)

Unnamed: 0,TerritoryID,TerritoryName,RegionID
0,1581,Westboro,1
1,1730,Bedford,1
2,1833,Georgetow,1
3,2116,Boston,1
4,2139,Cambridge,1
5,2184,Braintree,1
6,2903,Providence,1
7,3049,Hollis,3
8,3801,Portsmouth,3
9,6897,Wilton,1


In [23]:
territories.createOrReplaceTempView('territories')
t1 = spark.sql('select RegionID, TerritoryID, TerritoryName from territories where regionid = 1')
t1.show()
print(t1.count())

display(territories.select('RegionID', 'TerritoryID', 'TerritoryName').filter('RegionID=1'))

+--------+-----------+-------------+
|RegionID|TerritoryID|TerritoryName|
+--------+-----------+-------------+
|       1|       1581|     Westboro|
|       1|       1730|      Bedford|
|       1|       1833|    Georgetow|
|       1|       2116|       Boston|
|       1|       2139|    Cambridge|
|       1|       2184|    Braintree|
|       1|       2903|   Providence|
|       1|       6897|       Wilton|
|       1|       7960|   Morristown|
|       1|       8837|       Edison|
|       1|      10019|     New York|
|       1|      10038|     New York|
|       1|      11747|     Mellvile|
|       1|      14450|     Fairport|
|       1|      19713|       Neward|
|       1|      20852|    Rockville|
|       1|      27403|   Greensboro|
|       1|      27511|         Cary|
|       1|      40222|   Louisville|
+--------+-----------+-------------+

19


Unnamed: 0,RegionID,TerritoryID,TerritoryName
0,1,1581,Westboro
1,1,1730,Bedford
2,1,1833,Georgetow
3,1,2116,Boston
4,1,2139,Cambridge
5,1,2184,Braintree
6,1,2903,Providence
7,1,6897,Wilton
8,1,7960,Morristown
9,1,8837,Edison


In [24]:
%%sparksql
select r.regionid, r.regionname, t.territoryid, t.territoryname
from regions as r
join territories as t on r.regionid = t.regionid
order by r.regionid, t.territoryid


only showing top 20 row(s)


0,1,2,3
regionid,regionname,territoryid,territoryname
1,Eastern,1581,Westboro
1,Eastern,1730,Bedford
1,Eastern,1833,Georgetow
1,Eastern,2116,Boston
1,Eastern,2139,Cambridge
1,Eastern,2184,Braintree
1,Eastern,2903,Providence
1,Eastern,6897,Wilton
1,Eastern,7960,Morristown


## Spark DataFrames can be saved to a Hive table using either the saveAsTable method or writing a SQL query that uses CREATE TABLE AS.

In [None]:
help(territories.write.saveAsTable)

In [27]:
#territories.write.saveAsTable('territories2', mode='overwrite')
#territories.write.saveAsTable('territories3', mode='overwrite', format='csv')

#spark.sql("create table territories4 STORED AS AVRO LOCATION '/territories4' as select * from territories")


DataFrame[]

## Confirm that the tables exist in Hive.
From the command line type hive or spark-sql and then show tables;


In [None]:
#! hive -e "show tables;"
! spark-sql "show tables;"

## Queries use standard HQL to mix Hive tables and virtual tables. Both are read into a Spark DataFrame and the processing happens at the Spark level, not at the Hive level. HQL is just used to parse the logic into the corresponding Spark methods.

In [29]:
query = """
select r.regionid, r.regionname, t.territoryid, t.territoryname 
from regions as r 
join territories as t on r.regionid = t.regionid 
order by r.regionid, t.territoryid
"""
rt = spark.sql(query)
rt.show(10)

tr = regions.join(territories, regions.regionid == territories.RegionID). \
     select('regions.regionid', 'regionname', 'TerritoryID', 'TerritoryName') .\
    orderBy('regionid', 'territoryid')
tr.show(10)


+--------+----------+-----------+-------------+
|regionid|regionname|territoryid|territoryname|
+--------+----------+-----------+-------------+
|       1|   Eastern|       1581|     Westboro|
|       1|   Eastern|       1730|      Bedford|
|       1|   Eastern|       1833|    Georgetow|
|       1|   Eastern|       2116|       Boston|
|       1|   Eastern|       2139|    Cambridge|
|       1|   Eastern|       2184|    Braintree|
|       1|   Eastern|       2903|   Providence|
|       1|   Eastern|       6897|       Wilton|
|       1|   Eastern|       7960|   Morristown|
|       1|   Eastern|       8837|       Edison|
+--------+----------+-----------+-------------+
only showing top 10 rows

+--------+----------+-----------+-------------+
|regionid|regionname|TerritoryID|TerritoryName|
+--------+----------+-----------+-------------+
|       1|   Eastern|       1581|     Westboro|
|       1|   Eastern|       1730|      Bedford|
|       1|   Eastern|       1833|    Georgetow|
|       1|   E

## LAB: ## 
### Read the northwind JSON products and make it into a TempView and do the same with the CSVHeaders version of categories.
<br>
<details><summary>Click for <b>hint</b></summary>
<p>
Look at Day 2 or below to copy the code to load in the DataFrames
<br>
Turn each DataFrame into a temporary view
<br>
<br>
</p>
</details>

<details><summary>Click for <b>code</b></summary>
<p>

```python
categories = spark.read.load('file:///class/datasets/northwind/CSVHeaders/categories', format = 'csv', sep = ',', inferSchema = True, header = True)

prodSchema = StructType([
    StructField('productid', IntegerType()), 
    StructField('productname', StringType()),
    StructField('supplierid', IntegerType()), 
    StructField('categoryid', IntegerType()), 
    StructField('quantityperunit', StringType()), 
    StructField('unitprice', FloatType()), 
    StructField('unitsinstock', IntegerType()), 
    StructField('unitsonorder', IntegerType()), 
    StructField('reorderlevel', IntegerType()), 
    StructField('discontinued', IntegerType())
])
products = spark.read.json('file:///class/datasets/northwind/JSON/products', schema=prodSchema)

categories.createOrReplaceTempView('categories')
products.createOrReplaceTempView('products')```
</p>
</details>

In [32]:
%%sparksql
select * from products as p
join categories as c on p.categoryid = c.categoryid


only showing top 20 row(s)


0,1,2,3,4,5,6,7,8,9,10,11,12
productid,productname,supplierid,categoryid,quantityperunit,unitprice,unitsinstock,unitsonorder,reorderlevel,discontinued,CategoryID,CategoryName,Description
1,Chai,8,1,10 boxes x 30 bags,18.0,39,0,10,1,1,Beverages,Soft drinks coffees teas beers and ales
2,Chang,1,1,24 - 12 oz bottles,19.0,17,40,25,1,1,Beverages,Soft drinks coffees teas beers and ales
3,Aniseed Syrup,1,2,12 - 550 ml bottles,10.0,13,70,25,0,2,Condiments,Sweet and savory sauces relishes spreads and seasonings
4,Chef Anton's Cajun Seasoning,2,2,48 - 6 oz jars,22.0,53,0,0,0,2,Condiments,Sweet and savory sauces relishes spreads and seasonings
5,Chef Anton's Gumbo Mix,2,2,36 boxes,21.350000381469727,0,0,0,1,2,Condiments,Sweet and savory sauces relishes spreads and seasonings
6,Grandma's Boysenberry Spread,3,2,12 - 8 oz jars,25.0,120,0,25,0,2,Condiments,Sweet and savory sauces relishes spreads and seasonings
7,Uncle Bob's Organic Dried Pears,3,7,12 - 1 lb pkgs.,30.0,15,0,10,0,7,Produce,Dried fruit and bean curd
8,Northwoods Cranberry Sauce,3,2,12 - 12 oz jars,40.0,6,0,0,0,2,Condiments,Sweet and savory sauces relishes spreads and seasonings
9,Mishi Kobe Niku,4,6,18 - 500 g pkgs.,97.0,29,0,0,1,6,Meat/Poultry,Prepared meats


## Install the MySQL Python connector. This has nothing to do with Spark, but if you want to run SQL queries directly it is helpful.
It's already on our machines so we don't need to run this now.

In [None]:
! pip install mysql-connector-python

## Let's make sure we have a database for northwind and no regions table.

In [33]:
import mysql.connector
try:
    cn = mysql.connector.connect(host='localhost', user='test', password='password')
    cursor = cn.cursor()
    cursor.execute('create database if not exists northwind')
    cn.close()

    cn = mysql.connector.connect(host='localhost', user='test', password='password', database='northwind')
    cursor = cn.cursor()    
    cursor.execute('drop table if exists regions')
    cn.close()
except:
    print('something went wrong')
else:
    print('success')



success


## Write a DataFrame to a SQL database.

In [37]:
regions.show()

+--------+----------+
|regionid|regionname|
+--------+----------+
|       1|   Eastern|
|       2|   Western|
|       3|  Northern|
|       4|  Southern|
+--------+----------+



In [38]:
regions.write.format("jdbc").options(url="jdbc:mysql://localhost/northwind", \
                                     driver='com.mysql.jdbc.Driver', \
                                     dbtable='regions', \
                                     user='test', password = "password", mode = "append", \
                                     useSSL = "false").save()
print('Done')

Done


## Read a SQL table into a Spark DataFrame.

In [40]:
regions2 = spark.read.format("jdbc").options(url="jdbc:mysql://localhost/northwind", \
                                             driver="com.mysql.jdbc.Driver", \
                                             dbtable= "regions", \
                                             user="test", password="password").load()
regions2.show()


+--------+----------+
|regionid|regionname|
+--------+----------+
|       1|   Eastern|
|       2|   Western|
|       3|  Northern|
|       4|  Southern|
|       5|New Region|
+--------+----------+



## If you don't wont to bring an entire SQL table into a spark DataFrame use query instead of dbtable.

In [41]:
regions3 = spark.read.format("jdbc").options(url="jdbc:mysql://localhost/northwind", \
                                             driver="com.mysql.jdbc.Driver", \
                                             query= "select regionid, regionname from regions where regionid < 3", \
                                             user="test", password="password").load()
regions3.show()



+--------+----------+
|regionid|regionname|
+--------+----------+
|       1|   Eastern|
|       2|   Western|
+--------+----------+



## Creating the regions2 DataFrame does not execute anything yet, but by making the DataFrame into a Temp View then running a Spark SQL query, it tells Spark to read the SQL data into a DataFrame and then use the cluster to do the processing, not the SQL source.

In [42]:
regions2.createOrReplaceTempView('regions2')
spark.sql('select * from regions2 where regionid < 3').show()

+--------+----------+
|regionid|regionname|
+--------+----------+
|       1|   Eastern|
|       2|   Western|
+--------+----------+



In [None]:
spark.read.table('regions2').where('regionid < 3').show()

## Alternate ways to code a query using SQL and methods.

In [None]:
x = spark.sql('select count(*) from northwind.regions').collect()
print(x)
print(x[0], x[0]['count(1)'])

x = spark.sql('select count(*) as cnt from northwind.regions').collect()
print(x)
print(x[0], x[0].cnt)

print(spark.sql('select * from northwind.regions').count())
print(spark.read.table('northwind.regions').count())

print(spark.sql('select * from northwind.regions where regionid < 4').count())
print(spark.read.table('northwind.regions').where('regionid<4').count())

## Using SQL you can use familiar syntax instead of withColumn or withColumnRenamed methods.
Note the expr function needs to be imported when you want to use a stringified SQL function using dot syntax.

In [None]:
from pyspark.sql.functions import expr

t1 = spark.sql('select TerritoryID as TerrID, UPPER(TerritoryName) as TerritoryName, RegionID from northwind.territories')
t1.show(5)

from pyspark.sql.functions import expr
territories.withColumn('TerritoryName', expr('UPPER(TerritoryName)')).withColumnRenamed('TerritoryID', 'TerrID').show(5)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

# This won't work though if you want to use python functions, you need to go another step
# territories.withColumn('TerritoryName', territories.TerritoryName.upper()).show()

# You need to make the python function callable by spark by wrapping it in the udf function
# which tells spark what datatype it returns

def joeyfunc(x: str) -> str:
    return x[::-1]

#territories.withColumn('TerritoryName', udf(str.title, StringType())(territories.TerritoryName)).show()
territories.withColumn('TerritoryName', udf(joeyfunc, StringType())(territories.TerritoryName)).show()



## If you want to use a function that is not a standard Python or SQL function, you can always create one in Python and make it callable from Spark.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def inventoryvalue(quantity: int, price: float) -> float:
    return quantity * price

def inventoryvalue(quantity, price):
    return quantity * price

# Turn the Python function into a Spark callable function
invvalue = udf(inventoryvalue, FloatType())
p = products
p2 = p.withColumn('value', invvalue(p.unitsinstock, p.unitprice))
display(p2)




## Python decorators are an even better option.

In [None]:
@udf(FloatType())
def inventoryvalue(quantity, price):
    return quantity * price

p2 = p.withColumn('value', inventoryvalue(p.unitsinstock, p.unitprice))
display(p2)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def inventoryvalue(quantity, price):
    return quantity * price

# Or dynamically wrap it, but it's harder to read
p2 = p.withColumn('value', udf(inventoryvalue, FloatType())(p.unitsinstock, p.unitprice))
display(p2)



## To make it easier though, you could make the Python function into a udf that SQL can understand similar to how you can make a DataFrame seem like a virtual table with createOrReplaceTempView.

In [44]:
def reverseString(x):
    return x[::-1]

spark.udf.register('reverse', reverseString, StringType())

spark.sql('select *, reverse(TerritoryName) as Reversed from Territories').orderBy('Reversed').show()

+-----------+-------------+--------+----------------+
|TerritoryID|TerritoryName|RegionID|        Reversed|
+-----------+-------------+--------+----------------+
|      90405| Santa Monica|       2|joeyacinoM atnaS|
|      29202|     Columbia|       4|    joeyaibmuloC|
|      19428| Philadelphia|       3|joeyaihpledalihP|
|      33607|        Tampa|       4|       joeyapmaT|
|      95054|  Santa Clara|       2| joeyaralC atnaS|
|      30346|      Atlanta|       4|     joeyatnaltA|
|      48075|   Southfield|       3|  joeydleifhtuoS|
|      98052|      Redmond|       2|     joeydnomdeR|
|      44122|    Beachwood|       3|   joeydoowhcaeB|
|      19713|       Neward|       1|      joeydraweN|
|       1730|      Bedford|       1|     joeydrofdeB|
|       2903|   Providence|       1|  joeyecnedivorP|
|       2184|    Braintree|       1|   joeyeertniarB|
|       2139|    Cambridge|       1|   joeyegdirbmaC|
|      85251|   Scottsdale|       2|  joeyeladsttocS|
|      11747|     Mellvile| 

## HQL has collect_set and collect_list functions to aggregate items into a list instead of summing them up. 

In [None]:
from pyspark.sql.functions import collect_list
territories.groupBy(territories.RegionID).agg(collect_list(territories.TerritoryName)).show()

tr1 = spark.sql("SELECT RegionID, collect_list(TerritoryName) AS TerritoryList FROM Territories GROUP BY RegionID")
tr1.show()
tr1.printSchema()
print(tr1.take(1))


## Instead of a simple datatype, you could also collect complex structured objects using the HQL NAMED_STRUCT.

In [None]:
sql = """
SELECT r.RegionID, r.RegionName
, COLLECT_SET(NAMED_STRUCT("TerritoryID", TerritoryID, "TerritoryName", TerritoryName)) AS TerritoryList
FROM northwind.Regions AS r
JOIN Territories AS t ON r.RegionID = t.RegionID
GROUP BY r.RegionID, r.RegionName
ORDER BY r.RegionID
"""

tr2 = spark.sql(sql)
tr2.printSchema()
print(tr2)
tr2.show()
print(tr2.take(2))
tr2.write.json('TerritoryRegion.json')
spark.sql('create table TerritoryRegion as ' + sql)

## If you have data that is already collected into a complex datatype and want to flatten it, you could use HQL EXPLODE function.

## You could use the Spark explode method.

In [None]:
from pyspark.sql.functions import explode
tr1.select('RegionID', explode('TerritoryList')).withColumnRenamed('col','TerritoryName').show()


## Or if the DataFrame is turned into a Temp View, you could use the HQL query to do it.

In [None]:
tr1.createOrReplaceTempView('RegionTerritories')
sql = """
SELECT RegionID, TerritoryName
FROM RegionTerritories
LATERAL VIEW EXPLODE(TerritoryList) EXPLODED_TABLE AS TerritoryName
ORDER BY RegionID, TerritoryName
"""
spark.sql(sql).show()

## Or you could select specific elements from a collection.

In [None]:
tr2.createOrReplaceTempView('RegionTerritories')
spark.sql("select RegionId, RegionName, TerritoryList[0] as First, TerritoryList[size(TerritoryList) - 1] as Last, size(TerritoryList) as TerritoryCount from RegionTerritories").show()


## If the array is of structs, note the syntax of fetching the elements from the struct uses the . like an object property.

In [None]:
sql = """
SELECT RegionID, RegionName, Territory.TerritoryID AS TerritoryID
, Territory.TerritoryName AS TerritoryName
FROM RegionTerritories
LATERAL VIEW EXPLODE(TerritoryList) EXPLODED_TABLE AS Territory
"""
spark.sql(sql).show()


## HOMEWORK: ## 
**First Challenge**

Create a Python function to determine if a number is odd or even and use that to select only the even numbered shippers from the TSV folder of northwind. Note the TSV file does not have headers so you will need to do something to make the DataFrame have a meaningful structure. I would suggest using Spark SQL as much as possible to rename and cast the columns which are ShipperID, CompanyName, and Phone.

**Second Challenge**

Take the Order_LineItems.json folder, read it into a DataFrame, and flatten it and then calculate the average price paid for a product.

<br>
<details><summary>Click for <b>hint</b></summary>
<p>
Take a look at the MakeOrders_LineItems.py file provided to see how the Order_LineItems.json was generated in the first place
<br>
Use modulus with remainder of zero to determine if something is even
<br>
Use udf to make a version of the function that is callable using dot syntax and udf.register to make a version callable from within a SQL string
<br>
Use LATERAL VIEW EXPLODE() EXPLODED_TABLE to flatten out the nested format file
<br>
Once flattened do a traditional aggregate to calculate the average
<br>
<br>
</p>
</details>





In [None]:
%load_ext sparksql_magic

In [None]:
%%sparksql
select * from regions