<a href="https://colab.research.google.com/github/tyNAKAMOL/Simple-ETL-Pyspark/blob/main/ETL_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [2]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [3]:
# install findspark using pip
!pip install -q findspark

In [None]:
!pip install mysql-connector-python

In [None]:
!pip install pyspark==3.0.0 # version ของ pyspark และ spark ควรเป็น version เดียวกัน

In [12]:
import findspark
findspark.init() 

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]")\
.appName("simple etl job")\
.getOrCreate()
# Check Spark Session Information
spark

**Etract**

In [26]:
import mysql.connector
import pandas as pd


# Config MySQL connection
host = '0.tcp.ap.ngrok.io'
database = 'etl'
user = 'root'
password = ''
port = 18578

# connection MySQL
connection = mysql.connector.connect(
    user=user,
    database=database,
    password=password,
    host=host,
    port=port
)

file_path ="/content/sample_data/raw_demo.csv"

df = spark.read.option("sep", "|").option("header", "true").csv(file_path)
pandas_df = df.toPandas()


cursor = connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS SOURCE_DATA_DEMO(first_name VARCHAR(50), last_name VARCHAR(50), age VARCHAR(50), sex VARCHAR(50));")
sql = "INSERT INTO SOURCE_DATA_DEMO (first_name,last_name,age,sex) VALUES (%s, %s, %s, %s)"

for index, row in pandas_df.iterrows():
    cursor.execute(sql, (row['First Name'],row['Last Name'],row['Age'],row['Sex']))

connection.commit()
connection.close()



**Transform**

In [None]:
import mysql.connector
import pandas as pd

# Config MySQL connection
host = '0.tcp.ap.ngrok.io'
database = 'etl'
user = 'root'
password = ''
port = 18578

# connection MySQL
connection = mysql.connector.connect(
    user=user,
    database=database,
    password=password,
    host=host,
    port=port
)

cursor = connection.cursor()
pdf = pd.read_sql("SELECT * FROM SOURCE_DATA_DEMO", con=connection)
connection.commit()
connection.close()
dftransform = spark.createDataFrame(pdf)
dftransform.show()


In [17]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import initcap

dftransform = dftransform.withColumn('first_name', regexp_replace('first_name', r'[^a-zA-Z0-9 ]', ''))
dftransform = dftransform.withColumn('last_name', regexp_replace('last_name', r'[^a-zA-Z0-9 ]', ''))

#remove whitespace
dftransform = dftransform.withColumn('first_name', regexp_replace('first_name', ' ', ''))
dftransform = dftransform.withColumn('last_name', regexp_replace('last_name', ' ', ''))
dftransform = dftransform.withColumn('sex', regexp_replace('sex', ' ', ''))

#The first character is uppercase
dftransform = dftransform.withColumn('first_name', initcap('first_name'))
dftransform = dftransform.withColumn('last_name', initcap('last_name'))

dftransform.show()

+----------+----------+---+------+
|first_name| last_name|age|   sex|
+----------+----------+---+------+
|     Maria|Loudermilk| 54|    MF|
|    Eugene|    Travis| 23|  LGBT|
|   Dolores|   Heywood| 35|  both|
|      Rita|     White| 57|     -|
|    Corina|    Milton| 10|  Male|
|     Alice|    Farrer| 37|    FM|
|     Jacob|   Lemoine| 60|  both|
|    Robert|   Fischer| 48|  both|
|     Gayle|   Deberry| 58|     f|
|     Lloyd|     Brown| 45|     M|
|     Kayla|     Mejia| 41|    MF|
|    Arnold|    Powell| 65|  Male|
|     Susan|   Monahan|187|Female|
|   Winfred|    Vitale| 53|     -|
|    Sergio|      Chun| 40|  LGBT|
|  Jonathan|     Flynn| 70|    MF|
|   Shirley|    Wilson| 67|  both|
|     Karen|     Weuve| 22|     m|
|     Naomi|    Aleman| 49|   Man|
|   Jerrold|  Martinez| 58|     f|
+----------+----------+---+------+
only showing top 20 rows



In [19]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# drop Age Error
dft2 = dftransform.filter(col("age").isNotNull())
dft2.show()
dft2 = dft2.withColumn("age",dft2.age.cast(IntegerType()))
dft2.printSchema()

+----------+----------+---+------+
|first_name| last_name|age|   sex|
+----------+----------+---+------+
|     Maria|Loudermilk| 54|    MF|
|    Eugene|    Travis| 23|  LGBT|
|   Dolores|   Heywood| 35|  both|
|      Rita|     White| 57|     -|
|    Corina|    Milton| 10|  Male|
|     Alice|    Farrer| 37|    FM|
|     Jacob|   Lemoine| 60|  both|
|    Robert|   Fischer| 48|  both|
|     Gayle|   Deberry| 58|     f|
|     Lloyd|     Brown| 45|     M|
|     Kayla|     Mejia| 41|    MF|
|    Arnold|    Powell| 65|  Male|
|     Susan|   Monahan|187|Female|
|   Winfred|    Vitale| 53|     -|
|    Sergio|      Chun| 40|  LGBT|
|  Jonathan|     Flynn| 70|    MF|
|   Shirley|    Wilson| 67|  both|
|     Karen|     Weuve| 22|     m|
|     Naomi|    Aleman| 49|   Man|
|   Jerrold|  Martinez| 58|     f|
+----------+----------+---+------+
only showing top 20 rows

root
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sex: 

In [20]:
from pyspark.sql.functions import when, col

dft2 = dft2.withColumn('sex', 
    when(col('sex').isin(['Female', 'Girl', 'f']), 'F')
    .when(col('sex').isin(['Man', 'Male', 'FM', 'MF']), 'M')
    .when(col('sex').isin(['both']), 'LGBT')
    .when(col('sex').isin(['m']), 'M')
    .when(col('sex') == '-', 'Not Defined')
    .otherwise(col('sex'))
)
dft2.show()

+----------+----------+---+-----------+
|first_name| last_name|age|        sex|
+----------+----------+---+-----------+
|     Maria|Loudermilk| 54|          M|
|    Eugene|    Travis| 23|       LGBT|
|   Dolores|   Heywood| 35|       LGBT|
|      Rita|     White| 57|Not Defined|
|    Corina|    Milton| 10|          M|
|     Alice|    Farrer| 37|          M|
|     Jacob|   Lemoine| 60|       LGBT|
|    Robert|   Fischer| 48|       LGBT|
|     Gayle|   Deberry| 58|          F|
|     Lloyd|     Brown| 45|          M|
|     Kayla|     Mejia| 41|          M|
|    Arnold|    Powell| 65|          M|
|     Susan|   Monahan|187|          F|
|   Winfred|    Vitale| 53|Not Defined|
|    Sergio|      Chun| 40|       LGBT|
|  Jonathan|     Flynn| 70|          M|
|   Shirley|    Wilson| 67|       LGBT|
|     Karen|     Weuve| 22|          M|
|     Naomi|    Aleman| 49|          M|
|   Jerrold|  Martinez| 58|          F|
+----------+----------+---+-----------+
only showing top 20 rows



In [21]:
import mysql.connector
import pandas as pd


# Config MySQL connection
host = '0.tcp.ap.ngrok.io'
database = 'etl'
user = 'root'
password = ''
port = 18578

# connection MySQL
connection = mysql.connector.connect(
    user=user,
    database=database,
    password=password,
    host=host,
    port=port
)


pandas_df = dft2.toPandas()

cursor = connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS STATING_DATA_DEMO(first_name VARCHAR(50), last_name VARCHAR(50), age VARCHAR(50), sex VARCHAR(50));")
sql = "INSERT INTO STATING_DATA_DEMO (first_name,last_name,age,sex) VALUES (%s, %s, %s, %s)"

for index, row in pandas_df.iterrows():
    cursor.execute(sql, (row['first_name'],row['last_name'],row['age'],row['sex']))

connection.commit()
connection.close()

**Load**

In [None]:
import mysql.connector
import pandas as pd


# Config MySQL connection
host = '0.tcp.ap.ngrok.io'
database = 'etl'
user = 'root'
password = ''
port = 18578

# connection MySQL
connection = mysql.connector.connect(
    user=user,
    database=database,
    password=password,
    host=host,
    port=port
)

cursor = connection.cursor()
pdf = pd.read_sql("SELECT * FROM STATING_DATA_DEMO", con=connection)
dfload = spark.createDataFrame(pdf)
pandas_df = dfload.toPandas()

cursor.execute("CREATE TABLE IF NOT EXISTS LOAD_DATA_DEMO(first_name VARCHAR(50), last_name VARCHAR(50), age VARCHAR(50), sex VARCHAR(50));")
sql = "INSERT INTO LOAD_DATA_DEMO (first_name,last_name,age,sex) VALUES (%s, %s, %s, %s)"

for index, row in pandas_df.iterrows():
    cursor.execute(sql, (row['first_name'],row['last_name'],row['age'],row['sex']))

connection.commit()
connection.close()
