In [1]:
import findspark
findspark.init("/Users/max/Drive/spark")

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
# Create a Spark Session
spark = SparkSession.builder.appName("CreditCardSystem").getOrCreate()

23/10/03 17:35:43 WARN Utils: Your hostname, Mahmouds-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.2 instead (on interface en0)
23/10/03 17:35:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/03 17:35:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
json_schema = StructType([
    StructField("FIRST_NAME", StringType(), nullable=True),
    StructField("MIDDLE_NAME", StringType(), nullable=True),
    StructField("LAST_NAME", StringType(), nullable=True),
    StructField("SSN", IntegerType(), nullable=True),
    StructField("CREDIT_CARD_NO", StringType(), nullable=True),
    StructField("APT_NO", StringType(), nullable=True),
    StructField("STREET_NAME", StringType(), nullable=True),
    StructField("CUST_CITY", StringType(), nullable=True),
    StructField("CUST_STATE", StringType(), nullable=True),
    StructField("CUST_COUNTRY", StringType(), nullable=True),
    StructField("CUST_ZIP", StringType(), nullable=True),
    StructField("CUST_PHONE", LongType(), nullable=True), 
    StructField("CUST_EMAIL", StringType(), nullable=True),
    StructField("LAST_UPDATED", TimestampType(), nullable=True)
])


# Read the JSON File from the data folder
customer_data = spark.read.json('/Users/max/Drive/VS_Projects/cap_project/data/cdw_sapp_custmer.json', schema= json_schema)

23/10/03 17:35:59 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [36]:
customer_data.show()

+----------+-----------+---------+---------+----------------+------+-----------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|FIRST_NAME|MIDDLE_NAME|LAST_NAME|      SSN|  CREDIT_CARD_NO|APT_NO|      STREET_NAME|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+----------+-----------+---------+---------+----------------+------+-----------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|      Alec|         Wm|   Hooper|123456100|4210653310061055|   656|Main Street North|     Natchez|        MS|United States|   39120|   1237818| AHooper@example.com|2018-04-21 09:49:02|
|      Etta|    Brendan|   Holman|123453023|4210653310102868|   829|    Redwood Drive|Wethersfield|        CT|United States|   06109|   1238933| EHolman@example.com|2018-04-21 09:49:02|
|    Wilber|   Ezequiel|   Dunham|123454487|4210653310116272|   683| 1

In [50]:
# Apply Transformations Based on Mapping Document
transformed_data = customer_data.select(
    col("SSN").alias("SSN"),
    initcap(col("FIRST_NAME")).alias("FIRST_NAME"),
    lower(col("MIDDLE_NAME")).alias("MIDDLE_NAME"),
    initcap(col("LAST_NAME")).alias("LAST_NAME"),
    col("CREDIT_CARD_NO").alias("Credit_card_no"),
    concat_ws(", ", col("STREET_NAME"), col("APT_NO")).alias("FULL_STREET_ADDRESS"),
    col("CUST_CITY").alias("CUST_CITY"),
    col("CUST_STATE").alias("CUST_STATE"),
    col("CUST_COUNTRY").alias("CUST_COUNTRY"),
    col("CUST_ZIP").alias("CUST_ZIP"),
    regexp_replace(col("CUST_PHONE").cast("string"), r"(\d{3})(\d{4})", r"($1)$2").alias("CUST_PHONE"),
    col("CUST_EMAIL").alias("CUST_EMAIL"),
    col("LAST_UPDATED").alias("LAST_UPDATED")
)

In [51]:
transformed_data.show()

+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|      SSN|FIRST_NAME|MIDDLE_NAME|LAST_NAME|  Credit_card_no| FULL_STREET_ADDRESS|   CUST_CITY|CUST_STATE| CUST_COUNTRY|CUST_ZIP|CUST_PHONE|          CUST_EMAIL|       LAST_UPDATED|
+---------+----------+-----------+---------+----------------+--------------------+------------+----------+-------------+--------+----------+--------------------+-------------------+
|123456100|      Alec|         wm|   Hooper|4210653310061055|Main Street North...|     Natchez|        MS|United States|   39120| (123)7818| AHooper@example.com|2018-04-21 09:49:02|
|123453023|      Etta|    brendan|   Holman|4210653310102868|  Redwood Drive, 829|Wethersfield|        CT|United States|   06109| (123)8933| EHolman@example.com|2018-04-21 09:49:02|
|123454487|    Wilber|   ezequiel|   Dunham|4210653310116272|12th Street East,...|     Hun

In [52]:
transformed_data.printSchema()

root
 |-- SSN: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- Credit_card_no: string (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = false)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [54]:
config = spark.read.option("multiline", "true").json("/Users/max/Drive/VS_Projects/cap_project/config/config.yaml")


# Write the DataFrame to the RDBMS using the configuration
transformed_data.write \
    .format("jdbc") \
    .option("url", config.select("url").first()[0]) \
    .option("driver", config.select("driver").first()[0]) \
    .option("dbtable", "table010") \
    .option("user", config.select("user").first()[0]) \
    .option("password", config.select("password").first()[0]) \
    .mode("overwrite") \
    .save()