# Import Libraries

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import concat,lit

# Project 1 : Created Order Table

![image-2.png](attachment:image-2.png)

## Read Table from CSV

In [2]:
spark = SparkSession.builder.appName('ProjectSpark').getOrCreate()

In [9]:
import os
source = 'data'
list_table = os.listdir(source)
list_table = [s.replace('.csv', '') for s in list_table]
list_table.remove('archive')
list_table

['albums',
 'artists',
 'customers',
 'employees',
 'genres',
 'invoices',
 'invoice_items',
 'media_types',
 'playlists',
 'playlist_track',
 'tracks']

In [10]:
for x,i in zip(list_table,list_table):
    globals()[x] = spark.read.csv(f'data/{i}.csv',header=True,inferSchema=True)

In [13]:
playlists = playlists.withColumnRenamed('Name','PlaylistName')
tracks = tracks.withColumnRenamed('Name','ProductName') \
        .drop('UnitPrice')
artists = artists.withColumnRenamed('Name','ArtistName')
genres = genres.withColumnRenamed('Name','GenreName')
employees = employees.withColumn('EmployeeName',\
        concat(employees['FirstName'],lit(' '),employees['LastName']))
employees = employees.select('EmployeeId','EmployeeName')

## Transformation

In [14]:
order = invoice_items.join \
(invoices,invoice_items['InvoiceId']==invoices['InvoiceId'],'left')\
.drop('InvoiceLineId','InvoiceId')
order = order.join \
(tracks,order['TrackId']==tracks['TrackId'],'left').drop('TrackId')
order = order.join \
(albums,order['AlbumId']==albums['AlbumId'],'left').drop('AlbumId')
order = order.join \
(artists,order['ArtistId']==artists['ArtistId'],'left').drop('ArtistId')
order = order.join \
(media_types,order['MediaTypeId']==media_types['MediaTypeId'],'left').drop('MediaTypeId')
order = order.join \
(genres,order['GenreId']==genres['GenreId'],'left').drop('GenreId')
order = order.join \
(customers,order['CustomerId']==customers['CustomerId'],'left').drop('CustomerId')
order = order.join \
(employees,order['SupportRepId']==employees['EmployeeId']).drop('SupportRepId')

## Write to Parquet

In [15]:
order.write.mode('overwrite').parquet("order.parquet")

In [16]:
spark.read.parquet("order.parquet",header=True,inferSchema=True).limit(5).show()

+---------+--------+-------------------+--------------------+-----------+------------+--------------+-----------------+-----+--------------------+--------------------+------------+-------+--------------------+----------+--------------------+---------+---------+--------+-------+--------------------+---------+-----+-------+----------+----------------+----+--------------------+----------+-------------+
|UnitPrice|Quantity|        InvoiceDate|      BillingAddress|BillingCity|BillingState|BillingCountry|BillingPostalCode|Total|         ProductName|            Composer|Milliseconds|  Bytes|               Title|ArtistName|                Name|GenreName|FirstName|LastName|Company|             Address|     City|State|Country|PostalCode|           Phone| Fax|               Email|EmployeeId| EmployeeName|
+---------+--------+-------------------+--------------------+-----------+------------+--------------+-----------------+-----+--------------------+--------------------+------------+-------+------

## Write to PostgresSQL

In [17]:
spark.version

'3.4.0'

In [18]:
mode = "overwrite"
url = "jdbc:postgresql://localhost/postgres"
properties = {"user": "postgres","password": "postgres","driver": "org.postgresql.Driver"}
order.write.jdbc(url=url, table="order_table", mode=mode, properties=properties)

# Project 2 : MNIST Fashion Dataset

![image-2.png](attachment:image-2.png)

## Convert ubyte to PNG

In [31]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import concat,lit

In [32]:
# Imports
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
.config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar").getOrCreate()

In [33]:
import os
import struct
import sys

from array import array
from os import path

import png

In [34]:
path = "C:/Users/Hanif/Data Science/Data Engineer/Pyspark/Pyspark-for-Data-Engineer/data/archive"

In [35]:
def read(dataset = "training", path = "C:/Users/Hanif/Data Science/Data Engineer/Pyspark/Pyspark-for-Data-Engineer/data/archive"):
    if dataset is "training":
        fname_img = os.path.join(path, 'train-images-idx3-ubyte')
        fname_img = fname_img.replace('\\','/')
        fname_lbl = os.path.join(path, 'train-labels-idx1-ubyte')
        fname_lbl = fname_lbl.replace('\\','/')
    elif dataset is "testing":
        fname_img = os.path.join(path, 't10k-images-idx3-ubyte')
        fname_img = fname_img.replace('\\','/')
        fname_lbl = os.path.join(path, 't10k-labels-idx1-ubyte')
        fname_lbl = fname_lbl.replace('\\','/')
    else:
        raise ValueError("dataset must be 'testing' or 'training'")

    flbl = open(fname_lbl, 'rb')
    magic_nr, size = struct.unpack(">II", flbl.read(8))
    lbl = array("b", flbl.read())
    flbl.close()

    fimg = open(fname_img, 'rb')
    magic_nr, size, rows, cols = struct.unpack(">IIII", fimg.read(16))
    img = array("B", fimg.read())
    fimg.close()

    return lbl, img, size, rows, cols

def write_dataset(labels, data, size, rows, cols, output_dir):
    # create output directories
    output_dirs = [
        os.path.join(output_dir, str(i))
        for i in range(10)
    ]

    # write data
    for (i, label) in enumerate(labels):
        output_filename = os.path.join(output_dirs[label], str(i) + ".png")
        print("writing " + output_filename)
        with open(output_filename, "wb") as h:
            w = png.Writer(cols, rows, greyscale=True)
            data_i = [
                data[ (i*rows*cols + j*cols) : (i*rows*cols + (j+1)*cols) ]
                for j in range(rows)
            ]
            w.write(h, data_i)

  if dataset is "training":
  elif dataset is "testing":


In [None]:
input_path = 'C:/Users/Hanif/Data Science/Data Engineer/Pyspark/Pyspark-for-Data-Engineer/data/archive'
output_path = 'C:/Users/Hanif/Data Science/Data Engineer/Pyspark/Pyspark-for-Data-Engineer/data/archive/output'
for dataset in ["training", "testing"]:
    labels, data, size, rows, cols = read(dataset, input_path)
    write_dataset(labels, data, size, rows, cols,
                  os.path.join(output_path, dataset))

## Data Preprocessing with Python and Pyspark

In [37]:
import os
path = 'C:/Users/Hanif/Data Science/Data Engineer/Pyspark/Pyspark-for-Data-Engineer/data/archive/output'
train = 'training'
test = 'testing'
train_path = os.path.join(path,train).replace('\\','/')
test_path = os.path.join(path,test).replace('\\','/')
print('train: ',len(os.listdir(train_path)))
print('test : ',len(os.listdir(test_path )))

train:  10
test :  10


In [38]:
for i in (os.listdir(train_path)):
    a = os.path.join(train_path,i).replace('\\','/')
    print(f'jumlah data training {i} ',len(os.listdir(a)))

jumlah data training 0  6000
jumlah data training 1  6000
jumlah data training 2  6000
jumlah data training 3  6000
jumlah data training 4  6000
jumlah data training 5  6000
jumlah data training 6  6000
jumlah data training 7  6000
jumlah data training 8  6000
jumlah data training 9  6000


In [39]:
for i in (os.listdir(test_path)):
    a = os.path.join(test_path,i).replace('\\','/')
    print(f'jumlah data training {i} ',len(os.listdir(a)))

jumlah data training 0  1000
jumlah data training 1  1000
jumlah data training 2  1000
jumlah data training 3  1000
jumlah data training 4  1000
jumlah data training 5  1000
jumlah data training 6  1000
jumlah data training 7  1000
jumlah data training 8  1000
jumlah data training 9  1000


In [40]:
test_data = []
test_category = []
for i in (os.listdir(test_path)):
    a = os.path.join(test_path,i).replace('\\','/')
    b = os.listdir(a)
    for k in b:
        c = os.path.join(a,k).replace('\\','/')
        test_category.append(i)
        test_data.append(c)

In [41]:
train_data = []
train_category = []
for i in (os.listdir(train_path)):
    a = os.path.join(train_path,i).replace('\\','/')
    b = os.listdir(a)
    for k in b:
        train_category.append(i)
        c = os.path.join(a,k).replace('\\','/')
        train_data.append(c)

In [42]:
category = train_category+test_category
image_path = train_data+test_data

In [43]:
print('jumlah image path',len(image_path))
print('jumlah category',len(category))

jumlah image path 70000
jumlah category 70000


### Create Spark Dataframe

In [44]:
data = list(zip(category,image_path))

In [45]:
import pandas as pd

In [46]:
pandas_df = pd.DataFrame(data,columns = ['category','image_path'])
pandas_df

Unnamed: 0,category,image_path
0,0,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
1,0,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
2,0,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
3,0,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
4,0,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
...,...,...
69995,9,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
69996,9,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
69997,9,C:/Users/Hanif/Data Science/Data Engineer/Pysp...
69998,9,C:/Users/Hanif/Data Science/Data Engineer/Pysp...


In [47]:
df = spark.createDataFrame(pandas_df)

## Write to MySQL

In [50]:
from sqlalchemy import create_engine

import pymysql

import pandas as pd

In [66]:
user = 'hanif'
pw = '12345'
db = 'data'
engine = create_engine(f"mysql+pymysql://{user}:{pw}@MSI/{db}"
                       .format(user="root",
                               pw="12345",
                               db="data"))

In [68]:
pandas_df.to_sql('image_data', con = engine, if_exists = 'append', chunksize = 1000)

70000

# Project 3 Fact Table : Movie Sellings

![image.png](attachment:image.png)

## Read Table from MySQL

In [2]:
spark = SparkSession.builder \
           .appName('SparkByExamples.com') \
           .config("spark.jars", "mysql-connector-java-8.0.13.jar").getOrCreate()

In [3]:
payment= spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/sakila") \
    .option("dbtable", "payment") \
    .option("user", "root") \
    .option("password", "12345") \
    .load()

In [4]:
list_table = spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/sakila") \
    .option("query", "SELECT TABLE_NAME FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'sakila'") \
    .option("user", "root") \
    .option("password", "12345") \
    .load()

In [5]:
all_table = list(list_table.toPandas()['TABLE_NAME'])

In [6]:
all_table

['actor',
 'actor_info',
 'address',
 'category',
 'city',
 'country',
 'customer',
 'customer_list',
 'film',
 'film_actor',
 'film_category',
 'film_list',
 'film_text',
 'inventory',
 'language',
 'nicer_but_slower_film_list',
 'payment',
 'rental',
 'sales_by_film_category',
 'sales_by_store',
 'staff',
 'staff_list',
 'store']

In [7]:
for x,i in zip(all_table,all_table):
    globals()[x] = spark.read\
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/sakila") \
    .option("dbtable", f"{i}") \
    .option("user", "root") \
    .option("password", "12345") \
    .load()

In [10]:
master_table= spark.read \
    .format("jdbc") \
    .option("driver","com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost:3306/sakila") \
    .option("query", "SELECT  A.payment_id,A.customer_id,E.address_id,A.staff_id,F.store_id,A.rental_id,B.inventory_id,C.film_id,D.language_id,A.amount,A.payment_date FROM sakila.payment AS A JOIN sakila.rental AS B ON A.rental_id = B.rental_id JOIN sakila.inventory AS C ON B.inventory_id = C.inventory_id JOIN sakila.film AS D ON C.film_id = D.film_id JOIN sakila.customer AS E ON A.customer_id = E.customer_id JOIN sakila.staff AS F ON A.staff_id = F.staff_id") \
    .option("user", "root") \
    .option("password", "12345") \
    .load()

In [11]:
master_table.show()

+----------+-----------+----------+--------+--------+---------+------------+-------+-----------+------+-------------------+
|payment_id|customer_id|address_id|staff_id|store_id|rental_id|inventory_id|film_id|language_id|amount|       payment_date|
+----------+-----------+----------+--------+--------+---------+------------+-------+-----------+------+-------------------+
|     14568|        541|       547|       1|       1|    12894|        4581|   1000|          1|  7.99|2005-08-19 03:49:28|
|      6095|        226|       230|       1|       1|     6712|        4581|   1000|          1|  9.99|2005-07-12 13:24:47|
|      6321|        235|       239|       1|       1|     1493|        4581|   1000|          1|  4.99|2005-06-15 21:50:32|
|      5832|        215|       219|       1|       1|      711|        4581|   1000|          1|  4.99|2005-05-29 03:49:03|
|      8849|        327|       332|       1|       1|    15916|        4580|   1000|          1|  9.99|2005-08-23 17:56:01|
|      7