### Setup and Installation

https://towardsdatascience.com/create-your-first-etl-pipeline-in-apache-spark-and-python-ec3d12e2c169 

### Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext 

### Sparksession

SparkSession is the entry point for programming Spark applications. It let you interact with DataSet and DataFrame APIs provided by Spark. We set the application name by calling appName. The getOrCreate() method either returns a new SparkSession of the app or returns the existing one.

In [2]:

if __name__ == '__main__':
    scSpark = SparkSession \
        .builder \
        .appName("reading csv") \
        .getOrCreate()

22/07/14 19:40:31 WARN Utils: Your hostname, Ubi20 resolves to a loopback address: 127.0.1.1; using 192.168.0.5 instead (on interface wlp2s0)
22/07/14 19:40:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/14 19:40:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Read CSV files

I set the file path and then called .read.csv to read the CSV file. The .cache() caches the returned resultset hence increase the performance.

In [3]:
data_file = 'data.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()
print('Total Records = {}'.format(sdfData.count()))
sdfData.show()

Total Records = 4
+------+----+--------+
| name |age | country|
+------+----+--------+
| adnan|  40|Pakistan|
|  maaz|   9|Pakistan|
|  musa|   4|Pakistan|
|ayesha|  32|Pakistan|
+------+----+--------+



read multiple files: it will read all files starts with data and of type CSV. it dumps all the data from the CSVs into a single dataframe. this dumping will only work if all the CSVs follow a certain schema, not if you have a CSV with different column names. 

In [4]:
data_file = 'data*.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()
print('Total Records = {}'.format(sdfData.count()))
sdfData.show()

Total Records = 8
+-------+----+--------+
|  name |age | country|
+-------+----+--------+
| noreen|  23| England|
|  Aamir|   9|Pakistan|
|  Noman|   4|Pakistan|
|Rasheed|  12|Pakistan|
|  adnan|  40|Pakistan|
|   maaz|   9|Pakistan|
|   musa|   4|Pakistan|
| ayesha|  32|Pakistan|
+-------+----+--------+



### SparkSQL

You can perform many operations with DataFrame but Spark provides you much easier and familiar interface to manipulate the data by using SQLContext. It is the gateway to SparkSQL which lets you use SQL like queries to get the desired results. Before we try SQL queries, let’s try to group records by Gender. We are dealing with the EXTRACT part of the ETL here.

In [5]:
data_file = 'supermarket_sales.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()

gender = sdfData.groupBy('Gender').count()
print(gender.show())

+------+-----+
|Gender|count|
+------+-----+
|Female|  501|
|  Male|  499|
+------+-----+

None


SparkSQL allows you to use SQL like queries to access the data. First, we create a temporary table out of the dataframe. For that purpose registerTampTable is used. In our case the table name is sales. Once it’s done you can use typical SQL queries on it. In our case it is Select * from sales.

In [6]:
# registerTampTable is deprecated 
sdfData.createOrReplaceTempView("sales")
output =  scSpark.sql('SELECT * from sales limit 5')
output.show()

+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity| Tax 5%|   Total|     Date| Time|    Payment|  cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+-------+--------+---------+-----+-----------+------+-----------------------+------------+------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|26.1415|548.9715| 1/5/2019|13:08|    Ewallet|522.83|            4.761904762|     26.1415|   9.1|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|   3.82|   80.22| 3/8/2019|10:29|       Cash|  76.4|            4.761904762|        3.82|   9.6|
|631-41-3108|     A|   Yangon|       Normal|  Male|  Ho

In [7]:
output = scSpark.sql('SELECT * from sales WHERE `Unit Price` < 15 AND Quantity < 10 limit 5' )
output.show()

+-----------+------+---------+-------------+------+--------------------+----------+--------+------+-------+---------+-----+-----------+-----+-----------------------+------------+------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity|Tax 5%|  Total|     Date| Time|    Payment| cogs|gross margin percentage|gross income|Rating|
+-----------+------+---------+-------------+------+--------------------+----------+--------+------+-------+---------+-----+-----------+-----+-----------------------+------------+------+
|351-62-0822|     B| Mandalay|       Member|Female| Fashion accessories|     14.48|       4| 2.896| 60.816| 2/6/2019|18:07|    Ewallet|57.92|            4.761904762|       2.896|   4.5|
|871-39-9221|     C|Naypyitaw|       Normal|Female|Electronic access...|     12.45|       6| 3.735| 78.435| 2/9/2019|13:11|       Cash| 74.7|            4.761904762|       3.735|   4.1|
|586-25-0848|     A|   Yangon|       Normal|Female|   Sports and trave

Or even aggregated values:

In [8]:
output = scSpark.sql('SELECT COUNT(*) as total, City from sales GROUP BY City')
output.show()

+-----+---------+
|total|     City|
+-----+---------+
|  328|Naypyitaw|
|  332| Mandalay|
|  340|   Yangon|
+-----+---------+



### Saving, Loading

Finally the LOAD part of the ETL. What if you want to save this transformed data? Well, you have many options available, RDBMS, XML or JSON.

In [9]:
# output.write.format('json').save('filtered.json')

It created a folder with the name of the file, in our case it is filtered.json. Then, a file with the name _SUCCESStells whether the operation was a success or not. In case it fails a file with the name _FAILURE is generated. Then, you find multiple files here. The reason for multiple files is that each work is involved in the operation of writing in the file. If you want to create a single file(which is not recommended) then coalesce can be used that collects and reduces the data from all partitions to a single dataframe.

In [10]:
# this does not work, it saves in the same way as above 
# output.coalesce(1).write.format('json').save('filtered_single.json') 

output the following data:

{"total":328,"City":"Naypyitaw"} <br>
{"total":332,"City":"Mandalay"} <br>
{"total":340,"City":"Yangon"}

### MySQL and Apache Spark Integration

The above dataframe contains the transformed data. We would like to load this data into MYSQL for further usage like Visualization or showing on an app. 

First, we need the MySQL connector library to interact with Spark. We will download the connector from MySQL website and put it in a folder. We will amend SparkSession to include the JAR file:

In [11]:
""" scSpark = SparkSession \
        .builder \
        .appName("reading csv") \
        .config("spark.driver.extraClassPath", "/usr/local/spark/jars/mysql-connector-java-8.0.16.jar") \
        .getOrCreate() """

' scSpark = SparkSession         .builder         .appName("reading csv")         .config("spark.driver.extraClassPath", "/usr/local/spark/jars/mysql-connector-java-8.0.16.jar")         .getOrCreate() '

In [12]:
# output = scSpark.sql('SELECT COUNT(*) as total, City from sales GROUP BY City')
# output.show()
""" output.write.format('jdbc').options(
        url='jdbc:mysql://localhost/spark',
        driver='com.mysql.cj.jdbc.Driver',
        dbtable='city_info',
        user='root',
        password='root').mode('append').save() """

" output.write.format('jdbc').options(\n        url='jdbc:mysql://localhost/spark',\n        driver='com.mysql.cj.jdbc.Driver',\n        dbtable='city_info',\n        user='root',\n        password='root').mode('append').save() "