# Let's start

In [1]:
#We will import two libraries: SparkSession and SQLContext.
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import types 
from pyspark.sql.types import IntegerType
if __name__ == '__main__':
    scSpark = SparkSession \
            .builder \
            .appName("simple_ETL_project") \
            .getOrCreate()

In [2]:
#Reading csv file
data_file = '/home/shashank/Documents/Sample Datas/supermarket_sales.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()
print('Total Records = {}'.format(sdfData.count()))
sdfData.show(10)

Total Records = 1000
+-----------+------+---------+-------------+------+--------------------+----------+--------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity|
+-----------+------+---------+-------------+------+--------------------+----------+--------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|
|631-41-3108|     A|   Yangon|       Normal|  Male|  Home and lifestyle|     46.33|       7|
|123-19-1176|     A|   Yangon|       Member|  Male|   Health and beauty|     58.22|       8|
|373-73-7910|     A|   Yangon|       Normal|  Male|   Sports and travel|     86.31|       7|
|699-14-3026|     C|Naypyitaw|       Normal|  Male|Electronic access...|     85.39|       7|
|355-53-5943|     A|   Yangon|       Member|Female|Electronic access...|     68.84|       6|
|315-22-5665|     C|Naypyitaw|       Normal|Femal

### EXTRACT part of the ETL here

In [3]:
#EXTRACT part of the ETL here

data_file = '/home/shashank/Documents/Sample Datas/supermarket_sales.csv'
sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache()
gender = sdfData.groupBy('Gender').count()

print("Total number of male & Female customers:-")
print(gender.show())

filtered_data = sdfData.filter(sdfData['Gender']=="Male")
def sum_col(df, col):
    return df.select(F.sum(col)).collect()[0][0]
total_quantity_M = sum_col(filtered_data, 'Quantity')
total_quantity_F = sum_col(sdfData.filter(sdfData['Gender']=="Female"), 'Quantity')

print("Total Quantity purchased by male:-", total_quantity_M)
print("Total Quantity purchased by female:-", total_quantity_F)

Total number of male & Female customers:-
+------+-----+
|Gender|count|
+------+-----+
|Female|  501|
|  Male|  499|
+------+-----+

None
Total Quantity purchased by male:- 2641.0
Total Quantity purchased by female:- 2869.0



### TRANSFORM part of the ETL here

In [4]:
sdfData.registerTempTable("sales")
output =  scSpark.sql('SELECT * from sales')
output.show(5)

+-----------+------+---------+-------------+------+--------------------+----------+--------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity|
+-----------+------+---------+-------------+------+--------------------+----------+--------+
|750-67-8428|     A|   Yangon|       Member|Female|   Health and beauty|     74.69|       7|
|226-31-3081|     C|Naypyitaw|       Normal|Female|Electronic access...|     15.28|       5|
|631-41-3108|     A|   Yangon|       Normal|  Male|  Home and lifestyle|     46.33|       7|
|123-19-1176|     A|   Yangon|       Member|  Male|   Health and beauty|     58.22|       8|
|373-73-7910|     A|   Yangon|       Normal|  Male|   Sports and travel|     86.31|       7|
+-----------+------+---------+-------------+------+--------------------+----------+--------+
only showing top 5 rows



In [5]:
output = scSpark.sql('SELECT * from sales WHERE `Unit Price` < 15 AND Quantity < 10')
output.show(5)

+-----------+------+---------+-------------+------+--------------------+----------+--------+
| Invoice ID|Branch|     City|Customer type|Gender|        Product line|Unit price|Quantity|
+-----------+------+---------+-------------+------+--------------------+----------+--------+
|351-62-0822|     B| Mandalay|       Member|Female| Fashion accessories|     14.48|       4|
|871-39-9221|     C|Naypyitaw|       Normal|Female|Electronic access...|     12.45|       6|
|586-25-0848|     A|   Yangon|       Normal|Female|   Sports and travel|     12.34|       7|
|389-25-3394|     C|Naypyitaw|       Normal|  Male|Electronic access...|     11.81|       5|
|279-62-1445|     C|Naypyitaw|       Member|Female| Fashion accessories|     12.54|       1|
+-----------+------+---------+-------------+------+--------------------+----------+--------+
only showing top 5 rows



In [6]:
output = scSpark.sql('SELECT COUNT(*) as total, City from sales GROUP BY City')
output.show()

+-----+---------+
|total|     City|
+-----+---------+
|  328|Naypyitaw|
|  332| Mandalay|
|  340|   Yangon|
+-----+---------+




### Finally the LOAD part of the ETL

In [7]:
output.write.format('json').save('filtered_data.json')

In [8]:
output.coalesce(1).write.format('json').save('filteredData_in_singleFile.json')

In [9]:
#MySQL & Apache Spark Integration

scSpark = SparkSession \
        .builder \
        .appName("reading csv") \
        .config("spark.driver.extraClassPath", "/usr/local/spark/jars/mysql-connector-java-5.1.49-bin.jar") \
        .getOrCreate()


#Before executing this create database(eg:spark_test_db) and table(eg:city_info) 
output = scSpark.sql('SELECT COUNT(*) as total, City from sales GROUP BY City')
output.show()
output.write.format('jdbc').options(
    url='jdbc:mysql://localhost/spark_test_db',
    driver='com.mysql.jdbc.Driver',
    dbtable='city_info',
    user='root',
    password='hivepassword').mode('append').save()

+-----+---------+
|total|     City|
+-----+---------+
|  328|Naypyitaw|
|  332| Mandalay|
|  340|   Yangon|
+-----+---------+

