# Understanding SparkContext

- A `SparkContext` represents the entry point to Spark functionality. It's like a key to your car. PySpark automatically creates a `SparkContext` for you in the `PySpark` shell (so you don't have to create it by yourself) and is exposed via a variable `sc`.

- In this simple exercise, you'll find out the attributes of the `SparkContext` in your PySpark shell which you'll be using for the rest of the course.

## Instructions
- Print the version of `SparkContext` in the PySpark shell.
- Print the Python version of `SparkContext` in the PySpark shell.
- What is the master of SparkContext in the PySpark shell?

In [59]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [60]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
#spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()
spark = SparkSession.builder.appName("ParquetLoader").getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [66]:
df = spark.read.parquet("/data_parquet/Savings/file.parquet")
#df = spark.read.parquet("hdfs://localhost:9000/data/file.parquet")
#df = spark.read.csv("/1.txt")


In [62]:
df.show()

+----------+----------+--------+---------------+----------+----+-----+---+------+
|   cust_id|start_date|end_date|       trans_id|      date|year|month|day|amount|
+----------+----------+--------+---------------+----------+----+-----+---+------+
|CI6XLYUMQK|2015-05-01|        |TVVQUIQYIHEK5QR|2019-08-07|2019|    8|  7| 56.37|
|CI6XLYUMQK|2015-05-01|        |TS6S4FUUAOSAQYS|2020-11-01|2020|   11|  1|108.03|
|CI6XLYUMQK|2015-05-01|        |TWCMZ0Y64G2Y678|2017-02-02|2017|    2|  2|  46.4|
|CI6XLYUMQK|2015-05-01|        |TULILEKRT5YXMLU|2017-06-14|2017|    6| 14| 83.26|
|CI6XLYUMQK|2015-05-01|        |TQE2PBES0CNUQZN|2018-08-13|2018|    8| 13| 54.31|
|CI6XLYUMQK|2015-05-01|        |T3MDSUNY6W13SRH|2017-12-01|2017|   12|  1| 76.93|
|CI6XLYUMQK|2015-05-01|        |TM1839JYXJLNAOH|2019-10-24|2019|   10| 24| 162.0|
|CI6XLYUMQK|2015-05-01|        |TWGI1XNVSVCIO0V|2017-10-03|2017|   10|  3|146.92|
|CI6XLYUMQK|2015-05-01|        |T7QRXSQROKVBCSX|2016-08-12|2016|    8| 12| 74.66|
|CI6XLYUMQK|2015

In [41]:
df.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- trans_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
 |-- amount: double (nullable = true)



In [50]:
df.describe().select(['summary','amount']).show()

+-------+------------------+
|summary|            amount|
+-------+------------------+
|  count|           7762038|
|   mean|179.36878004462466|
| stddev| 233.4165596596204|
|    min|               0.4|
|    max|           4319.49|
+-------+------------------+



In [5]:
df.createOrReplaceTempView("data_analysis")

In [11]:
df1 = spark.sql("SELECT count(*) from data_analysis where end_date is NULL")
df1.collect()

[Row(count(1)=7762038)]

In [13]:
df.show()

+----------+----------+--------+---------------+----------+----+-----+---+------+
|   cust_id|start_date|end_date|       trans_id|      date|year|month|day|amount|
+----------+----------+--------+---------------+----------+----+-----+---+------+
|CI6XLYUMQK|2015-05-01|        |TVVQUIQYIHEK5QR|2019-08-07|2019|    8|  7| 56.37|
|CI6XLYUMQK|2015-05-01|        |TS6S4FUUAOSAQYS|2020-11-01|2020|   11|  1|108.03|
|CI6XLYUMQK|2015-05-01|        |TWCMZ0Y64G2Y678|2017-02-02|2017|    2|  2|  46.4|
|CI6XLYUMQK|2015-05-01|        |TULILEKRT5YXMLU|2017-06-14|2017|    6| 14| 83.26|
|CI6XLYUMQK|2015-05-01|        |TQE2PBES0CNUQZN|2018-08-13|2018|    8| 13| 54.31|
|CI6XLYUMQK|2015-05-01|        |T3MDSUNY6W13SRH|2017-12-01|2017|   12|  1| 76.93|
|CI6XLYUMQK|2015-05-01|        |TM1839JYXJLNAOH|2019-10-24|2019|   10| 24| 162.0|
|CI6XLYUMQK|2015-05-01|        |TWGI1XNVSVCIO0V|2017-10-03|2017|   10|  3|146.92|
|CI6XLYUMQK|2015-05-01|        |T7QRXSQROKVBCSX|2016-08-12|2016|    8| 12| 74.66|
|CI6XLYUMQK|2015

In [67]:
#due to the full column has null value we are droping the column
df1=df.drop('end_date')
df1.show(5)

+----------+----------+---------------+----------+----+-----+---+------+
|   cust_id|start_date|       trans_id|      date|year|month|day|amount|
+----------+----------+---------------+----------+----+-----+---+------+
|CI6XLYUMQK|2015-05-01|T6TYXHDKMA4XWFQ|2019-10-01|2019|   10|  1|104.02|
|CI6XLYUMQK|2015-05-01|TGB9D2A2987RWA5|2016-11-09|2016|   11|  9| 37.66|
|CI6XLYUMQK|2015-05-01|TH0135O7BZ4OKQ2|2019-05-01|2019|    5|  1| 73.81|
|CI6XLYUMQK|2015-05-01|TL4JWOPGCM8LFP5|2020-10-24|2020|   10| 24| 38.06|
|CI6XLYUMQK|2015-05-01|TTZM5V3A24LVK52|2015-08-04|2015|    8|  4| 50.26|
+----------+----------+---------------+----------+----+-----+---+------+
only showing top 5 rows



In [68]:
#we have date column which is again subdivided to year,month and day so we are droping year,month and day column to improve query performance
df1=df1.drop('year','month','day')
df1.show(5)

+----------+----------+---------------+----------+------+
|   cust_id|start_date|       trans_id|      date|amount|
+----------+----------+---------------+----------+------+
|CI6XLYUMQK|2015-05-01|T6TYXHDKMA4XWFQ|2019-10-01|104.02|
|CI6XLYUMQK|2015-05-01|TGB9D2A2987RWA5|2016-11-09| 37.66|
|CI6XLYUMQK|2015-05-01|TH0135O7BZ4OKQ2|2019-05-01| 73.81|
|CI6XLYUMQK|2015-05-01|TL4JWOPGCM8LFP5|2020-10-24| 38.06|
|CI6XLYUMQK|2015-05-01|TTZM5V3A24LVK52|2015-08-04| 50.26|
+----------+----------+---------------+----------+------+
only showing top 5 rows



In [69]:
df1.createOrReplaceTempView("data_analysis1")
df2 = spark.sql("select count(*) from (SELECT distinct(cust_id) from data_analysis1)")
df2.show(5)

+--------+
|count(1)|
+--------+
|   28079|
+--------+



In [70]:
df1.createOrReplaceTempView("data_analysis1")
df3 = spark.sql("select * from data_analysis1 order by cust_id,date,amount")
df3.show()

+----------+----------+---------------+----------+-------+
|   cust_id|start_date|       trans_id|      date| amount|
+----------+----------+---------------+----------+-------+
|C000TDGP4R|2017-07-01|TCN8DU7IIPXXITZ|2017-08-27| 293.81|
|C000TDGP4R|2017-07-01|T6JCPBW0DEZCW5D|2017-08-27| 528.93|
|C000TDGP4R|2017-07-01|TA0BSXWF1TMQA13|2017-09-14| 739.58|
|C000TDGP4R|2017-07-01|THF2IHDCNFWHSCE|2017-09-26| 755.15|
|C000TDGP4R|2017-07-01|TBIOCTWZS3MMR8X|2017-10-07| 564.97|
|C000TDGP4R|2017-07-01|TLFXR81YMOXMZS6|2017-10-08| 605.41|
|C000TDGP4R|2017-07-01|T65VY93AG2FYFVW|2018-01-22|  946.4|
|C000TDGP4R|2017-07-01|TT51BP8LD0KE44S|2018-02-21|1246.69|
|C000TDGP4R|2017-07-01|TZO7NNC5K2BW398|2018-03-17|1075.41|
|C000TDGP4R|2017-07-01|TSS6UJZ3DNKH4KC|2018-06-12| 703.56|
|C000TDGP4R|2017-07-01|TNQ7WIKQITGMYLZ|2018-07-23| 604.31|
|C000TDGP4R|2017-07-01|TAFN40488EWOJVS|2018-07-24| 673.62|
|C000TDGP4R|2017-07-01|TP84BK2DZW5OUV3|2018-08-16|1072.94|
|C000TDGP4R|2017-07-01|T64UPWEDWUI4KHK|2018-10-10|  402.

In [None]:
df3.createOrReplaceTempView("data_analysis3")

spark.sql("SELECT * FROM data_analysis3").write.csv("1st.csv") # this generated csv in multiple files

In [71]:
# COALESCE() HELPS TO CONVERT MULTIPLE FILES TO MIN MENTIONED FILES
df3.coalesce(1).write.format('csv').option('header', 'true').save('savings.csv')


In [None]:
'''
from pyspark.sql.functions import isnull, sum

null_counts = df3.select([sum(isnull('date'))])
null_counts.collect()
'''