In [1]:
import pandas as pd
import numpy as np
import os
from pyspark import SparkConf 
from pyspark.sql import SparkSession # https://spark.apache.org/docs/1.6.1/sql-programming-guide.html
from pyspark.sql import functions as F # access to the sql functions https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
from IPython.display import HTML

In [2]:
# To persist our sparksql data from session to session we will use derby.
warehouse_location = os.path.abspath('../../../data/spark-warehouse')
java_options = "-Dderby.system.home=" + warehouse_location
print(warehouse_location)
print(java_options)
# make sure you have set the warehouse location to 'home/jovyan/data/spark-warehouse'

/home/jovyan/data/spark-warehouse
-Dderby.system.home=/home/jovyan/data/spark-warehouse


In [3]:
if os.path.normpath("/home/jovyan/data/spark-warehouse") != warehouse_location:
    print('\x1b[6;37;41m' + 'Your path is not correct' + '\x1b[0m')

In [4]:
# Create the session
conf = (SparkConf()
    .set("spark.ui.port", "4041")
    .set('spark.jars', '/home/jovyan/scratch/postgresql-42.2.18.jar')
    .set("spark.sql.warehouse.dir", warehouse_location) # set above.
    .set("hive.metastore.schema.verification", False)
    .set("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=metastore_db;create=true") # settings to persist data for sparksql
    .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") # settings to persist data for sparksql
    .set("javax.jdo.option.ConnectionUserName", 'userman') # may not need this
    .set("jdo.option.ConnectionPassword", "pwd") # may not need this
    .set("spark.driver.extraJavaOptions",java_options) # setting where the derby log files are created.
    .set("spark.sql.inMemoryColumnarStorage.compressed", True) # default
    .set("spark.sql.inMemoryColumnarStorage.batchSize",10000) # default
    )

# Create the Session (used to be context)
# you can move the number up or down depending on your memory and processors "local[*]" will use all.
# The conf from above is used in .config.
spark = SparkSession.builder \
    .master("local[3]") \
    .appName('test') \
    .config(conf=conf) \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
# spark.stop()
# spark.sql('DROP DATABASE IF EXISTS irs990 CASCADE;')
# spark.sql("create database irs990")
spark.sql("SHOW DATABASES").show()

## Pulling data into our Spark environment

In this example, I am using a postgres database as specified in the below properties based on the 990 irs database we are using in CSE 451. If you don't have access to the database, you use the `.csv` file found in `/scratch`. 

### pushing our csv file to a database

In [32]:
spark.sql('DROP DATABASE IF EXISTS mycsv CASCADE;')
spark.sql("create database mycsv")
spark.sql("USE mycsv")
diamonds = spark.read.format("csv").load("../../../scratch/diamonds.csv")
print(diamonds.limit(5).show(5))
diamonds.write.mode("overwrite").saveAsTable("diamonds")
spark.sql('SHOW TABLES IN mycsv').show()


+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|  _c0|    _c1|  _c2|    _c3|  _c4|  _c5|  _c6| _c7| _c8| _c9|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+
|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|
| 0.23|  Ideal|    E|    SI2| 61.5|   55|  326|3.95|3.98|2.43|
| 0.21|Premium|    E|    SI1| 59.8|   61|  326|3.89|3.84|2.31|
| 0.23|   Good|    E|    VS1| 56.9|   65|  327|4.05|4.07|2.31|
| 0.29|Premium|    I|    VS2| 62.4|   58|  334| 4.2|4.23|2.63|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+

None
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|   mycsv| diamonds|      false|
+--------+---------+-----------+



### Pulling from our postgress irs990 database into our Spark database

For our CSE 451 students that have access to our database the following sections can be used to see the performance benefits of Spark for large tables.

In [33]:
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://c451_db_1:5432/irs990',
    'user': 'postgres',
    'password': 'postgres1234',
}

In [34]:
# schema table

schema = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('dbtable', "information_schema.tables") \
    .option('user', properties['user']) \
    .load()



In [35]:
tables = schema.toPandas().filter(['table_catalog', 'table_schema', 'table_name']).query('table_schema == "public"')

In [36]:
HTML(tables.to_html())

Unnamed: 0,table_catalog,table_schema,table_name
0,irs990,public,address_table
1,irs990,public,cmsid_eins
2,irs990,public,django_migrations
3,irs990,public,excess_benefits
4,irs990,public,excess_benefits_types
5,irs990,public,filing_filing
6,irs990,public,fl_loans_from
7,irs990,public,insider_assistance
8,irs990,public,insider_assistance_types
9,irs990,public,insider_transactions


In [1]:
# import shutil
# shutil.rmtree('/home/jovyan/data/spark-warehouse/metastore_db')
# shutil.rmtree('/home/jovyan/data/spark-warehouse')
# shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db/address_table')
# shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db/return_EZOffcrDrctrTrstEmpl')
# shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db/tmp_990ez_employees')

# spark.sql('DROP TABLE IF EXISTS return_EZOffcrDrctrTrstEmpl')
# spark.sql('DROP TABLE IF EXISTS address_table')
# spark.sql('DROP TABLE IF EXISTS tmp_990ez_employees')

In [38]:
return_EZOffcrDrctrTrstEmpl = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('dbtable', "return_EZOffcrDrctrTrstEmpl") \
    .option('user', properties['user']) \
    .load()

In [39]:
address = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('dbtable', 'address_table') \
    .option('user', properties['user']) \
    .load()

In [40]:
print(address.count())
print(return_EZOffcrDrctrTrstEmpl.count())

2243560
4698799


In [41]:
print('Number of partitions: {}'.format(address.rdd.getNumPartitions()))
print('Number of partitions: {}'.format(return_EZOffcrDrctrTrstEmpl.rdd.getNumPartitions()))

Number of partitions: 1
Number of partitions: 1


In [42]:
address = address.repartition(50)

In [43]:
return_EZOffcrDrctrTrstEmpl = return_EZOffcrDrctrTrstEmpl.repartition(50)

In [44]:
print('Number of partitions: {}'.format(address.rdd.getNumPartitions()))
print('Number of partitions: {}'.format(return_EZOffcrDrctrTrstEmpl.rdd.getNumPartitions()))

Number of partitions: 50
Number of partitions: 50


In [45]:
spark.sql("USE irs990")
address.write.saveAsTable("address_table")
return_EZOffcrDrctrTrstEmpl.write.saveAsTable('return_EZOffcrDrctrTrstEmpl')

In [6]:
spark.sql('SHOW TABLES IN irs990').show()


+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|  irs990|       address_table|      false|
|  irs990|return_ezoffcrdrc...|      false|
|  irs990| tmp_990ez_employees|      false|
+--------+--------------------+-----------+



In [48]:
%%time
sql_query_join = """
SELECT addt.*, rez.PrsnNm, rez.TtlTxt, rez.CmpnstnAmt
FROM return_EZOffcrDrctrTrstEmpl as rez
    LEFT JOIN address_table as addt
    ON rez.ein = addt.ein
    AND rez.object_id = addt.object_id
    ORDER BY addt.ein DESC, addt.object_id DESC;
"""

tmp_990ez_employees = spark.sql(sql_query_join)

tmp_990ez_employees.write.saveAsTable('tmp_990ez_employees')


CPU times: user 8.82 ms, sys: 5.94 ms, total: 14.8 ms
Wall time: 49 s


In [None]:
# https://github.com/jsfenfen/990-xml-database/blob/master/directors.sh

# DROP TABLE IF EXISTS tmp_990ez_employees;
# SELECT address_table.*,
# 	'/IRS990EZ' as form,
#    "PrsnNm",
#    "TtlTxt",
#    "CmpnstnAmt" 
#    INTO temporary table tmp_990EZ_employees
#    FROM return_EZOffcrDrctrTrstEmpl
# 	LEFT JOIN address_table ON return_EZOffcrDrctrTrstEmpl.ein = address_table.ein
# 	AND return_EZOffcrDrctrTrstEmpl.object_id= address_table.object_id;

In [50]:
temp = spark.sql('select * from tmp_990ez_employees')
temp.limit(20).toPandas()

Unnamed: 0,ein,object_id,RtrnHdr_TxPrdEndDt,RtrnHdr_TxYr,BsnssNm_BsnssNmLn1Txt,BsnssNm_BsnssNmLn2Txt,BsnssOffcr_PrsnNm,BsnssOffcr_PrsnTtlTxt,BsnssOffcr_PhnNm,BsnssOffcr_EmlAddrssTxt,...,USAddrss_SttAbbrvtnCd,USAddrss_ZIPCd,FrgnAddrss_AddrssLn1Txt,FrgnAddrss_AddrssLn2Txt,FrgnAddrss_CtyNm,FrgnAddrss_PrvncOrSttNm,FrgnAddrss_CntryCd,PrsnNm,TtlTxt,CmpnstnAmt
0,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Catherine Eloranto,President,2433
1,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Darcy Purick,Representative,350
2,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,June Foley,Representative,986
3,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Chris Ford,President,2433
4,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Angela Barnaby,Representative,0
5,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Michael Lawliss,Representative,175
6,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Donna Lynch,representative,175
7,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Willow Nolland,representative,175
8,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Lynn Fowler,Vice President,811
9,10674021,201411229349200611,2013-12-31,2013,Clinton Community Faculty Association,,Chris Ford,President,,,...,NY,12901,,,,,,Patti LaDuke,Treasurer,1622
