In [12]:
! python3 -m venv venv
! source venv/bin/activate
! pip3 install pyspark

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip[0m


In [13]:
from pyspark.sql.functions import col, explode,regexp_replace,split,from_json,min,max,datediff,to_date, when, current_date,sum,countDistinct,filter
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, BooleanType, TimestampType, DoubleType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Task3_subscription').getOrCreate()

In [14]:
df_schema = StructType([
    StructField("store_id", StringType(), True),
    StructField("installHistory", StringType(), True),  # Read raw JSON as String first
    StructField("subscriptionHistory", StringType(), True)  # Read raw JSON as String first
])

In [15]:
datasample_df = spark.read \
    .format("csv") \
    .option("delimiter",";") \
    .option("header", "true") \
    .schema(df_schema) \
    .load("files/dataset_sample.csv")


In [16]:
datasample_transform_df = datasample_df.withColumn(
    'store_id',
    regexp_replace(regexp_replace(regexp_replace(col('store_id'), r',"\[', ';['),r'",\[',';['),'""','"')
)


In [17]:
datasample_transform_df = datasample_transform_df.\
                    withColumn('store_ids',split(col('store_id'),';').getItem(0)). \
                    withColumn('installHistory',split(col('store_id'),';').getItem(1)). \
                    withColumn('subscriptionHistory',split(col('store_id'),';').getItem(2))
                    
datasample_transform_df = datasample_transform_df.select(
    col('store_ids').alias('store_id'),
    col('installHistory'),
    col('subscriptionHistory')
)


In [18]:
installHistory_schema = ArrayType(
    StructType([
        StructField('toNow',BooleanType(),True),
        StructField('startType',StringType(),True),
        StructField('fromDate',TimestampType(),True),
        StructField('endType',StringType(),True),
        StructField('toDate',TimestampType(),True)
    ])
)

subscriptionHistory_schema = ArrayType(
    StructType(
        [
        StructField('price',DoubleType(),True),
        StructField('planName',StringType(),True),
        StructField('fromDate',TimestampType(),True),
        StructField('toDate',TimestampType(),True),
        StructField('billingOnDate',StringType(),True)
        ]
    )
)

datasample_transform_df = datasample_transform_df.withColumn("installHistory",from_json('installHistory',installHistory_schema))\
                                .withColumn("subscriptionHistory",from_json(col('subscriptionHistory'),subscriptionHistory_schema))

In [19]:
datasample_transform_flattened = datasample_transform_df.withColumn("install", explode(col("installHistory")))

df_datasample = datasample_transform_flattened.select(\
    col("store_id"),
    col("install.toNow").alias("toNow"),
    col("install.startType").alias("startType"),
    col("install.fromDate").alias("fromDate"),
    col("install.endType").alias("endType"),
    when(col('toNow') == 'false',col('install.toDate')).otherwise(current_date()).alias('toDate'),
    col("subscriptionHistory").getItem(0).getField('price').alias('price'),
    col("subscriptionHistory").getItem(0).getField('planName').alias('planName'),
    col("subscriptionHistory").getItem(0).getField('fromDate').alias('sub_fromDate'),
    col("subscriptionHistory").getItem(0).getField('toDate').alias('sub_toDate'),
    col("subscriptionHistory").getItem(0).getField('billingOnDate').alias('billingOnDate')
)



In [23]:
# If toNow is set to True, the lifespan of each store is monitored up to the current time.
lifespan = df_datasample.groupBy('store_id').agg(
    min(to_date(col('fromDate'))).alias('startDate'),
    max(to_date(col('toDate'))).alias('endDate'),
    datediff(max(to_date(col('toDate'))),min(to_date(col('fromDate'))).alias('endDate')).alias('lifespan_in_active')
)
lifespan.show()


25/01/01 09:12:51 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: store_id,installHistory,subscriptionHistory
 Schema: store_id
Expected: store_id but found: store_id,installHistory,subscriptionHistory
CSV file: file:///Users/macos/Documents/Python/Project-CV/Clearerio/Clearerio_assignment/files/dataset_sample.csv


+----------+----------+----------+------------------+
|  store_id| startDate|   endDate|lifespan_in_active|
+----------+----------+----------+------------------+
|Store_5407|2022-05-30|2025-01-01|               947|
|Store_5125|2022-05-23|2023-03-07|               288|
|Store_4938|2022-05-17|2025-01-01|               960|
|Store_5390|2022-05-30|2025-01-01|               947|
|Store_5168|2022-05-24|2023-08-31|               464|
|Store_4960|2022-05-17|2025-01-01|               960|
|Store_4719|2022-05-10|2025-01-01|               967|
|Store_4435|2022-04-30|2025-01-01|               977|
|Store_5133|2022-05-23|2022-06-17|                25|
|Store_5067|2022-05-19|2025-01-01|               958|
|Store_4984|2022-05-18|2022-11-01|               167|
|Store_4882|2022-05-15|2022-05-17|                 2|
|Store_4779|2022-05-11|2025-01-01|               966|
|Store_4726|2022-05-10|2022-07-09|                60|
|Store_5315|2022-05-27|2025-01-01|               950|
|Store_4902|2022-05-16|2022-

In [21]:
# Total revenue by Supscription_date
revenue = df_datasample.select(
    col('store_id'),
    to_date(col('sub_fromDate')).alias('Subscription_Date'),
    col('price')
).distinct()

revenuebyday = revenue.groupBy('Subscription_Date').agg(
    sum(col('price')).alias('revenueByday')
)
revenuebyday.where(col('revenuebyday').isNotNull()).sort(col('Subscription_Date'),ascending=True).show()

25/01/01 09:09:46 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: store_id,installHistory,subscriptionHistory
 Schema: store_id
Expected: store_id but found: store_id,installHistory,subscriptionHistory
CSV file: file:///Users/macos/Documents/Python/Project-CV/Clearerio/Clearerio_assignment/files/dataset_sample.csv
                                                                                

+-----------------+------------+
|Subscription_Date|revenueByday|
+-----------------+------------+
|       2022-05-04|        19.0|
|       2022-05-08|        29.0|
|       2022-05-09|       148.2|
|       2022-05-10|       218.0|
|       2022-05-11|       117.0|
|       2022-05-12|        29.0|
|       2022-05-13|        29.0|
|       2022-05-15|       187.0|
|       2022-05-16|       308.2|
|       2022-05-17|       211.0|
|       2022-05-18|       175.0|
|       2022-05-19|        57.0|
|       2022-05-20|       264.0|
|       2022-05-21|        38.0|
|       2022-05-22|        76.0|
|       2022-05-23|       269.0|
|       2022-05-24|       243.0|
|       2022-05-25|       370.0|
|       2022-05-26|       601.0|
|       2022-05-27|       183.0|
+-----------------+------------+
only showing top 20 rows



In [22]:
# The number of User in status: Active_Users, New_user_by_day and Uninstall_Users by activity_date.
df_status = df_datasample.select(\
    col("store_id"),
    col("toNow"),
    col("startType"),
    to_date(col("fromDate")).alias('activity_date'),
    col("endType")
)
status = df_status.groupBy('activity_date').agg(
    countDistinct(when(col('toNow') == 'true',col('store_id'))).alias('Active_Users'),
    countDistinct(when(col('toNow') == 'false',col('store_id'))).alias('New_user_by_day'),
    countDistinct(when(col('endType') == 'CUSTOMER_CANCELLED', col('store_id'))).alias('Uninstall_Users'),
)
status.sort(col('activity_date'),ascending=True).show()


25/01/01 09:09:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: store_id,installHistory,subscriptionHistory
 Schema: store_id
Expected: store_id but found: store_id,installHistory,subscriptionHistory
CSV file: file:///Users/macos/Documents/Python/Project-CV/Clearerio/Clearerio_assignment/files/dataset_sample.csv


+-------------+------------+---------------+---------------+
|activity_date|Active_Users|New_user_by_day|Uninstall_Users|
+-------------+------------+---------------+---------------+
|   2022-04-29|           3|              7|              2|
|   2022-04-30|           2|             14|              1|
|   2022-05-01|           3|             15|              1|
|   2022-05-02|           7|             26|              4|
|   2022-05-03|          12|             21|              2|
|   2022-05-04|           8|             43|              9|
|   2022-05-05|           9|             31|              3|
|   2022-05-06|           9|             24|              4|
|   2022-05-07|           2|             14|              0|
|   2022-05-08|           1|             12|              4|
|   2022-05-09|          12|             34|              6|
|   2022-05-10|          11|             33|              6|
|   2022-05-11|          12|             36|              4|
|   2022-05-12|         