###Setup and Extract

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract,regexp_replace, concat, lit, lower, date_format
from pyspark.sql.types import StringType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DQThon Participants ETL") \
    .getOrCreate()

In [0]:
df_participant = spark.read.csv('s3://dqthon-hackathon-data/dqthon-participants.csv', header=True, inferSchema=True)

# Check schema and show first few rows
df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+
|      participant_id|  first_name|         last_name| birth_date|             address|        phone_number|      country|institute|occupation|register_time|
+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+
|bd9b6f88-b84f-4c4...|       Citra|        Nurdiyanti|05 Feb 1991|Gg. Monginsidi No...|            

##Transformations

####Transform Part I - Postal Code

In [0]:
# Extract postal code using regex
df_participant = df_participant.withColumn("postal_code", regexp_extract(col("address"), r'(\d+)$', 0))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+
|      participant_id|  first_name|         last_name| birth_date|             address|        phone_number|      country|institute|occupation|register_time|postal_code|
+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+
|bd9b6f88-b84f-4c4..

####Transform Part II - City

In [0]:
# Extract city using regex
df_participant = df_participant.withColumn("city", regexp_extract(col("address"), r'(?<=\n)(\w.+)(?=,)', 0))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+
|      participant_id|  first_name|         last_name| birth_date|             address|        phone_number|      country|institute|occupation|register_time|postal_code|city|
+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-

#### Transform Part III - Github Profile

In [0]:
# Create github_profile URL
df_participant = df_participant.withColumn("github_profile", concat(lit("https://github.com/"), lower(col("first_name")), lower(col("last_name"))))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- github_profile: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+--------------------+
|      participant_id|  first_name|         last_name| birth_date|             address|        phone_number|      country|institute|occupation|register_time|postal_code|city|      github_profile|
+--------------------+------------+------------------+--

#### Transform Part IV - Cleaned Phone Number

In [0]:
# Clean phone number format
df_participant = df_participant.withColumn("cleaned_phone_number", regexp_replace(col("phone_number"), r'^(\+62|62)', '0'))
df_participant = df_participant.withColumn("cleaned_phone_number", regexp_replace(col("cleaned_phone_number"), r'[()-]', ''))
df_participant = df_participant.withColumn("cleaned_phone_number", regexp_replace(col("cleaned_phone_number"), r'\s+', ''))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- github_profile: string (nullable = true)
 |-- cleaned_phone_number: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+--------------------+--------------------+
|      participant_id|  first_name|         last_name| birth_date|             address|        phone_number|      country|institute|occupation|register_time|postal_code|city|     

#### Transform Part V - Team Name

In [0]:
# Function to create team name
def create_team_name(first_name, last_name, country, institute):
    if first_name is None or last_name is None or country is None:
        return "unknown"

    abbrev_name = (first_name[0] if first_name else '') + (last_name[0] if last_name else '')
    if institute:
        abbrev_institute = ''.join([word[0] for word in institute.split()])
    else:
        abbrev_institute = "unknown"
    
    return f"{abbrev_name}-{country}-{abbrev_institute}"

# Register the function as a UDF
create_team_name_udf = udf(create_team_name, StringType())

# Apply the UDF
df_participant = df_participant.withColumn("team_name", create_team_name_udf(col("first_name"), col("last_name"), col("country"), col("institute")))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- github_profile: string (nullable = true)
 |-- cleaned_phone_number: string (nullable = true)
 |-- team_name: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+--------------------+--------------------+--------------------+
|      participant_id|  first_name|         last_name| birth_date|             address|        phone_number|      cou

#### Transform Part VI - Email

In [0]:
# Function to create email
def create_email(first_name, last_name, institute, country):
    first_name_lower = first_name.lower() if first_name else 'unknown'
    last_name_lower = last_name.lower() if last_name else 'unknown'
    if institute:
        abbrev_institute = ''.join([word[0] for word in institute.split()]).lower()
        institute_lower = institute.lower()
    else:
        abbrev_institute = "unknown"
        institute_lower = ""

    if 'universitas' in institute_lower:
        if country:
            if len(country.split()) > 1:
                country_code = ''.join([word[0] for word in country.split()]).lower()
            else:
                country_code = country[:3].lower()
        else:
            country_code = "unknown"
        return f"{first_name_lower}{last_name_lower}@{abbrev_institute}.ac.{country_code}"
    return f"{first_name_lower}{last_name_lower}@{abbrev_institute}.com"

# Register the function as a UDF
create_email_udf = udf(create_email, StringType())

# Apply the UDF
df_participant = df_participant.withColumn("email", create_email_udf(col("first_name"), col("last_name"), col("institute"), col("country")))

df_participant.printSchema()
df_participant.show(5)


root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- github_profile: string (nullable = true)
 |-- cleaned_phone_number: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- email: string (nullable = true)

+--------------------+------------+------------------+-----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+--------------------+--------------------+--------------------+--------------------+
|      participant_id|  first_name|         last_name| birt

#### Transform Part VII - Birth Date

In [0]:
# Convert birth_date to desired format
df_participant = df_participant.withColumn("birth_date", date_format(col("birth_date"), "yyyy-MM-dd"))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- github_profile: string (nullable = true)
 |-- cleaned_phone_number: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- email: string (nullable = true)

+--------------------+------------+------------------+----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+--------------------+--------------------+--------------------+--------------------+
|      participant_id|  first_name|         last_name|birth_

#### Transform Part VIII - Register Time

In [0]:
# Convert register_time to datetime format
df_participant = df_participant.withColumn("register_at", date_format((col("register_time").cast("timestamp")), "yyyy-MM-dd HH:mm:ss"))

df_participant.printSchema()
df_participant.show(5)

root
 |-- participant_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- country: double (nullable = true)
 |-- institute: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- register_time: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- github_profile: string (nullable = true)
 |-- cleaned_phone_number: string (nullable = true)
 |-- team_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- register_at: string (nullable = true)

+--------------------+------------+------------------+----------+--------------------+--------------------+-------------+---------+----------+-------------+-----------+----+--------------------+--------------------+--------------------+--------------------+-----------+
|    

## Load Data into SQL Database

In [0]:
# Save to SQL Database
df_participant.write.format("jdbc").options(
    url="jdbc:mysql://your-database-url:3306/your-database-name",
    driver="com.mysql.jdbc.Driver",
    dbtable="dqthon_participants",
    user="your-username",
    password="your-password"
).mode("overwrite").save()