In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder.appName("Clean Typeform Data").getOrCreate()

In [90]:
typeform_responses_df = spark.read.option("multiline", "true") \
               .option("quote", '"') \
               .option("escape", '"') \
               .option("header", True) \
               .option("mode", "PERMISSIVE") \
               .csv("../data/typeform_responses.csv", header=True)

### Answers Fact Table

In [91]:
from pyspark.sql.functions import regexp_replace

typeform_responses_df = typeform_responses_df.withColumn("answers_clean", regexp_replace("answers", '""', '"')) # replace two double quotes with one double quote

In [92]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def clean_json_string(s):
    if not s or s.strip() == "":
        return None
    try:
        s = s.strip('"')          # Remove outer double quotes
        return s
    except Exception as e:
        return None

clean_json_udf = udf(clean_json_string, StringType())

typeform_responses_df = typeform_responses_df.withColumn("answers_fixed", clean_json_udf("answers_clean"))


In [93]:
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, DoubleType, BooleanType, MapType
from pyspark.sql.functions import col, from_json

answer_schema = ArrayType(StructType([
    StructField("field", StructType([
        StructField("id", StringType()),
        StructField("type", StringType()),
        StructField("ref", StringType())
    ])),
    StructField("type", StringType()),
    StructField("text", StringType()),
    StructField("email", StringType()),
    StructField("number", DoubleType()),
    StructField("boolean", BooleanType()),
    StructField("date", StringType()),
    StructField("choice", StructType([
        StructField("label", StringType()),
        StructField("other", StringType())
    ])),
    #StructField("choices", StructType([StructField("labels", ArrayType(StringType()))])),
    StructField("phone_number", StringType()),
    StructField("file_url", StringType())
]))

In [94]:
typeform_responses_df = typeform_responses_df.withColumn("answers_parsed", from_json(col("answers_fixed"), answer_schema))


In [None]:
typeform_responses_df.filter(col("answers_parsed").isNull()).select("landing_id", "answers").show(truncate=False, n=5) # check to see if answers has been successfully parsed


In [None]:
typeform_responses_df.filter(col("answers_parsed").isNotNull()).select("landing_id", "answers_parsed").show(truncate=False, n=5) # check to see if answers has been successfully parsed

In [97]:
from pyspark.sql.functions import explode, coalesce

typeform_responses_df_exploded = typeform_responses_df.withColumn("answers", explode("answers_parsed"))

In [None]:
typeform_responses_df.show()

In [99]:
answers_f = typeform_responses_df_exploded.select(
    "landing_id",
    col("answers.field.id").alias("question_id"),
    col("answers.field.ref").alias("question_ref"),
    col("answers.type").alias("answer_type") ,
    coalesce(
        col("answers.text"),
        col("answers.email"),
        col("answers.number").cast("string"),
        col("answers.boolean").cast("string"),
        col("answers.date"),
        col("answers.phone_number"),
        col("answers.file_url"),
        col("answers.choice.label"),
        col("answers.choice.other")
    ).alias("answer_value")
)

In [None]:
answers_f.show()

### Submissions Fact Table

In [101]:
metadata_schema = StructType([
    StructField("user_agent", StringType()),
    StructField("platform", StringType()),
    StructField("referer", StringType()),
    StructField("network_id", StringType()),
    StructField("browser", StringType())
])

In [102]:
typeform_responses_df = typeform_responses_df.withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema))

In [103]:
submissions_f = typeform_responses_df.select(
    "landing_id",
    "landed_at",
    "submitted_at",
    "ingest_date",
    # col("metadata_parsed.user_agent"),
    # col("metadata_parsed.platform"),
    col("metadata_parsed.referer")
)

In [104]:
from pyspark.sql.functions import to_timestamp, col

submissions_f = submissions_f.withColumn(
    "landed_at_ts",
    to_timestamp(col("landed_at"), "yyyy-MM-dd'T'HH:mm:ss'Z'")
).withColumn(
    "submitted_at_ts",
    to_timestamp(col("submitted_at"), "yyyy-MM-dd'T'HH:mm:ss'Z'")
).withColumn(
    "ingest_date_ts",
    to_timestamp(col("ingest_date"), "yyyy-MM-dd HH:mm:ss")
)

In [None]:
submissions_f.select("landing_id",
    "landed_at",
    "landed_at_ts",
    "submitted_at",
    "submitted_at_ts",
    "ingest_date",
    "ingest_date_ts",
    "referer").show(truncate=False)

In [106]:
submissions_f = submissions_f.drop("landed_at", "submitted_at", "ingest_date")

In [107]:
from pyspark.sql.functions import expr

submissions_f = submissions_f.withColumn("typeform_source", expr("parse_url(referer, 'QUERY', 'typeform-source')")) \
       .withColumn("utm_source", expr("parse_url(referer, 'QUERY', 'utm_source')")) \
       .withColumn("utm_medium", expr("parse_url(referer, 'QUERY', 'utm_medium')")) \
       .withColumn("utm_campaign", expr("parse_url(referer, 'QUERY', 'utm_campaign')"))


In [108]:
submissions_f = submissions_f.drop("referer")

In [None]:
submissions_f.show()

### Writing Tables to Parquet

In [None]:
answers_f.write.mode("overwrite").parquet("../data/answers_f")
submissions_f.write.mode("overwrite").parquet("../data/submissions_f")
# dim_questions.write.mode("overwrite").parquet("data/dim_questions")

In [111]:
spark.stop()