In [2]:
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/marcelo/libs/mysql-connector-java-8.0.19.jar --jars /home/marcelo/libs/mysql-connector-java-8.0.19.jar pyspark-shell'

import findspark

findspark.add_packages('mysql:mysql-connector-java:8.0.19')

In [3]:
spark = SparkSession \
    .builder \
    .appName('POC ETL') \
    .master("local[*]") \
    .getOrCreate()

In [6]:
df = spark.read.option("header", True).format("csv").load("/home/marcelo/data/person-out.csv")
df

DataFrame[id: string, lastname: string, firstname: string, phones: string]

### Person ETL

In [14]:
person_df = df.select(["id", "firstname", "lastname"]).withColumnRenamed("firstname", "first_name").withColumnRenamed("lastname", "last_name")
person_df

DataFrame[id: string, first_name: string, last_name: string]

In [15]:
person_df.write.format('jdbc').options(
      url='jdbc:mysql://mysql:3306/mydatabase',
      driver='com.mysql.jdbc.Driver',
      dbtable='person',
      user='mama',
      password='mama').mode('append').save()

### Phone ETL

In [16]:
# Mapper String -> Json Object -> Attributes Json
import json
def parse_phone_json(array_str):
    """ 
    Transform String into Json and extract id and Phone Number
    input: array_str: json of phones
    """
    # Include commas for UDT cassandra
    json_str = re.sub(r'([a-zA-Z0-9_]+)\s*:\s*([^,}]+)?', '"\\1": "\\2"', array_str)
    # Replace double commas
    json_str = re.sub(r'(\"\')|(\'\")', '"', json_str)    
    json_obj = json.loads(json_str)
    for item in json_obj:
        # Extracts id and phone number
        yield item["id"], int(item["phone_number"])

        
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType, StructField
phone_json_schema = ArrayType(StructType([StructField('id', StringType(), nullable=False), StructField('phone_number', IntegerType(), nullable=False)]))

# Define udf
from pyspark.sql.functions import udf
udf_phone_parse_json = udf(lambda phones: parse_phone_json(phones), phone_json_schema)

In [17]:
phones_df = df.select(["id", "phones"])
phones_df

DataFrame[id: string, phones: string]

In [18]:
df_json_phones = phones_df.withColumn("phones", udf_phone_parse_json('phones'))
print(df_json_phones)

DataFrame[id: string, phones: array<struct<id:string,phone_number:int>>]


In [19]:
only_phones_df = df_json_phones.select(["id", "phones"])

In [20]:
only_phones_df.show()

+--------------------+--------------------+
|                  id|              phones|
+--------------------+--------------------+
|4b544bb1-5596-4d9...|[[4b544bb1-5596-4...|
|95534b58-674c-491...|[[95534b58-674c-4...|
|0154863c-33e0-431...|[[0154863c-33e0-4...|
|8387a1ad-1ef7-48f...|[[8387a1ad-1ef7-4...|
|c11daf2b-e0c0-4b0...|[[c11daf2b-e0c0-4...|
|59135d5a-d674-473...|[[59135d5a-d674-4...|
|ded9487d-dbe3-41f...|[[ded9487d-dbe3-4...|
|9af46608-0a11-46d...|[[9af46608-0a11-4...|
|a764efc6-4636-4e8...|[[a764efc6-4636-4...|
|7fab3692-a4e1-405...|[[7fab3692-a4e1-4...|
|e165548f-bdb6-419...|[[e165548f-bdb6-4...|
|4416f2e9-b0e3-4cf...|[[4416f2e9-b0e3-4...|
|d4ab95f2-d8d6-4f9...|[[d4ab95f2-d8d6-4...|
|5739b31a-d300-43d...|[[5739b31a-d300-4...|
|da0fa305-896a-4af...|[[da0fa305-896a-4...|
|ebe2b887-fec7-498...|[[ebe2b887-fec7-4...|
|52b0f3a4-53e4-4c3...|[[52b0f3a4-53e4-4...|
|42ace0db-332b-4f7...|[[42ace0db-332b-4...|
|bf297ff3-cb52-46e...|[[bf297ff3-cb52-4...|
|ea35f044-bd4a-490...|[[ea35f044

In [22]:
from pyspark.sql.functions import explode, explode_outer, posexplode
exploded_phones = only_phones_df.select(only_phones_df.id, posexplode(only_phones_df.phones))
exploded_phones.printSchema()
exploded_phones.show()

root
 |-- id: string (nullable = true)
 |-- pos: integer (nullable = false)
 |-- col: struct (nullable = true)
 |    |-- id: string (nullable = false)
 |    |-- phone_number: integer (nullable = false)

+--------------------+---+--------------------+
|                  id|pos|                 col|
+--------------------+---+--------------------+
|4b544bb1-5596-4d9...|  0|[4b544bb1-5596-4d...|
|4b544bb1-5596-4d9...|  1|[035c8240-3b2c-40...|
|95534b58-674c-491...|  0|[95534b58-674c-49...|
|95534b58-674c-491...|  1|[fee0eb4f-0701-4c...|
|0154863c-33e0-431...|  0|[0154863c-33e0-43...|
|0154863c-33e0-431...|  1|[486ac361-7216-40...|
|8387a1ad-1ef7-48f...|  0|[8387a1ad-1ef7-48...|
|8387a1ad-1ef7-48f...|  1|[e1778f30-ef6f-47...|
|c11daf2b-e0c0-4b0...|  0|[c11daf2b-e0c0-4b...|
|c11daf2b-e0c0-4b0...|  1|[e55f0adf-ca56-4b...|
|59135d5a-d674-473...|  0|[59135d5a-d674-47...|
|59135d5a-d674-473...|  1|[67a058b8-4f84-41...|
|ded9487d-dbe3-41f...|  0|[ded9487d-dbe3-41...|
|ded9487d-dbe3-41f...|  1|[01

In [23]:
exploded_phones = exploded_phones.withColumnRenamed("id", "person_id").withColumnRenamed("col", "phone")
phone_df = exploded_phones.select(["person_id", "phone.id", "phone.phone_number"])
phone_df.show()

+--------------------+--------------------+------------+
|           person_id|                  id|phone_number|
+--------------------+--------------------+------------+
|4b544bb1-5596-4d9...|4b544bb1-5596-4d9...|      123456|
|4b544bb1-5596-4d9...|035c8240-3b2c-40b...|      123456|
|95534b58-674c-491...|95534b58-674c-491...|      123456|
|95534b58-674c-491...|fee0eb4f-0701-4c3...|      123456|
|0154863c-33e0-431...|0154863c-33e0-431...|      123456|
|0154863c-33e0-431...|486ac361-7216-40a...|      123456|
|8387a1ad-1ef7-48f...|8387a1ad-1ef7-48f...|      123456|
|8387a1ad-1ef7-48f...|e1778f30-ef6f-47e...|      123456|
|c11daf2b-e0c0-4b0...|c11daf2b-e0c0-4b0...|      123456|
|c11daf2b-e0c0-4b0...|e55f0adf-ca56-4b0...|      123456|
|59135d5a-d674-473...|59135d5a-d674-473...|      123456|
|59135d5a-d674-473...|67a058b8-4f84-415...|      123456|
|ded9487d-dbe3-41f...|ded9487d-dbe3-41f...|      123456|
|ded9487d-dbe3-41f...|01c75caa-7aac-481...|      123456|
|9af46608-0a11-46d...|9af46608-

In [24]:
phone_df.write.format('jdbc').options(
      url='jdbc:mysql://mysql:3306/mydatabase',
      driver='com.mysql.jdbc.Driver',
      dbtable='phone',
      user='mama',
      password='mama').mode('append').save()