In [None]:
print('Lalala')

In [None]:
bucket_name = "yc-dataproc-tasks1"
base_df = spark.read.option("header", True).csv(f's3a://{bucket_name}/data/transaction_data.csv')

base_df.printSchema()

In [27]:
base_df.columns

['household_key',
 'BASKET_ID',
 'DAY',
 'PRODUCT_ID',
 'QUANTITY',
 'SALES_VALUE',
 'STORE_ID',
 'RETAIL_DISC',
 'TRANS_TIME',
 'WEEK_NO',
 'COUPON_DISC',
 'COUPON_MATCH_DISC']

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType 

In [None]:
schema = StructType([ \
        StructField('household_key', IntegerType()), \
        StructField('BASKET_ID', DoubleType()), \
        StructField('DAY', IntegerType()), \
        StructField('PRODUCT_ID', DoubleType()), \
        StructField('QUANTITY', IntegerType()), \
        StructField('SALES_VALUE', IntegerType()), \
        StructField('STORE_ID', DoubleType()), \
        StructField('RETAIL_DISC', StringType()), \
        StructField('TRANS_TIME', IntegerType()), \
        StructField('WEEK_NO', IntegerType()), \
        StructField('COUPON_DISC', StringType()), \
        StructField('COUPON_MATCH_DISC', StringType()) \
    ])

In [None]:
df_custom_schema = spark.read.option("header", True).schema(schema).csv(f's3a://{bucket_name}/data/transaction_data.csv')

df_custom_schema.printSchema()

In [None]:
from pyspark.sql.functions import col, lit

In [None]:
base_df.select(base_df.columns).show(5)

base_df.select("BASKET_ID", "SALES_VALUE", "RETAIL_DISC").show(5)

base_df.select((col("SALES_VALUE") * 100).alias("FIRST COLUMN"), col("PRODUCT_ID"), lit("Lallala").alias("MORE COLUMNS")).show(5)

base_df.selectExpr('PRODUCT_ID',
                  'TRANS_TIME').show(5)

In [None]:
from pyspark.sql.functions import col, when, lower, regexp_replace, length, expr

In [None]:
base_df\
    .select('household_key', 'DAY')\
    .dropDuplicates(['household_key'])\
    .withColumn('shortKeyDescription', when(length(col('household_key')) < 3, 'Less 100')\
        .when(length(col('household_key')) > 3, 'More 1000')\
        .otherwise('101-999')
    )\
    .withColumn('DAY', expr('case when DAY > 500 then DAY + 10000 else round(sqrt(DAY), 3) end'))\
    .show(10)

In [None]:
from pyspark.sql.functions import avg, col, count, min, max, sum

In [None]:
base_df.select("PRODUCT_ID", "QUANTITY", "SALES_VALUE")\
    .groupBy("PRODUCT_ID")\
    .agg(avg("SALES_VALUE").alias("Lala1"), sum("QUANTITY").alias('Lala2'))\
    .orderBy(col('Lala2').desc())\
    .show(10)

base_df.select('STORE_ID', 'PRODUCT_ID')\
    .distinct()\
    .groupBy('STORE_ID')\
    .agg(count('*').alias('Lala'))\
    .orderBy('STORE_ID')\
    .show(10)

In [None]:
from pyspark.sql.functions import lower, upper, substring, split, trim, regexp_replace, length

In [None]:
countries = [
    "United States of America", "Canada", "Germany", "France", "Italy",
    "Czech Republic", "Australia", "Brazil", "China", "India",
    "Mexico", "Russian Federation", "Japan", "South Korea", "United Kingdom",
    "Norway", "Sweden", "Denmark", "Finland", "Netherlands"
]

expr_query = 'case ' \
    + ' '.join([f'when WEEK_NO = {i+1} then \'{country}\'' for i, country in enumerate(countries)]) \
    + ' else \'Unknown\' end as country'

df_with_counties = base_df.dropDuplicates(['WEEK_NO']).orderBy('WEEK_NO').selectExpr(expr_query)

df_with_counties\
    .dropDuplicates(['country'])\
    .limit(20)\
    .select(
        'country',\
        lower('country').alias('lower'),\
        upper('country').alias('upper'),\
        substring('country', 1, 4).alias('substring'),\
        split('country', ' ').alias('split'),\
        trim('country').alias('trim'),\
        regexp_replace('country', 'e', 'EEE').alias('regexp'),\
        length('country').alias('length')
    ).show(truncate=False)

In [26]:
from pyspark.sql.functions import abs, round, ceil, pow, sqrt

In [28]:
base_df\
    .dropDuplicates(['SALES_VALUE'])\
    .limit(15)\
    .select(
        'SALES_VALUE',
        abs('SALES_VALUE').alias('abs'),
        round('SALES_VALUE').alias('abs'),
        ceil('SALES_VALUE').alias('ceil'),
        pow('SALES_VALUE', 3).alias('abs'),
        sqrt('SALES_VALUE').alias('sqrt')
    ).show()



+-----------+-----+-----+----+------------------+------------------+
|SALES_VALUE|  abs|  abs|ceil|               abs|              sqrt|
+-----------+-----+-----+----+------------------+------------------+
|      23.97|23.97| 24.0|  24|13772.224772999998| 4.895916665957459|
|       7.16| 7.16|  7.0|   8|        367.061696|2.6758176320519302|
|        8.5|  8.5|  9.0|   9|           614.125|2.9154759474226504|
|      17.42|17.42| 17.0|  18|5286.2104880000015| 4.173727350941841|
|      19.06|19.06| 19.0|  20| 6924.185415999998| 4.365775990588615|
|      20.64|20.64| 21.0|  21| 8792.838144000001|4.5431266766402185|
|       10.7| 10.7| 11.0|  11|1225.0429999999997| 3.271085446759225|
|      26.63|26.63| 27.0|  27|18884.848246999998| 5.160426338976267|
|      31.47|31.47| 31.0|  32|31166.657522999998|   5.6098128311023|
|      65.66|65.66| 66.0|  66|     283075.729496| 8.103085831953157|
|      34.63|34.63| 35.0|  35| 41529.57384700001| 5.884725991921799|
|      42.87|42.87| 43.0|  43| 787

                                                                                

In [35]:
from pyspark.sql.functions import array_contains, array_join, array_sort, size

In [36]:
df_with_counties\
    .select('country', split('country', ' ').alias('splitted'))\
    .distinct()\
    .select(
        'country',
        'splitted',
        array_contains('splitted', 'United'),
        array_join('splitted', '_'),
        size('splitted')
    ).show()



+--------------------+--------------------+--------------------------------+-----------------------+--------------+
|             country|            splitted|array_contains(splitted, United)|array_join(splitted, _)|size(splitted)|
+--------------------+--------------------+--------------------------------+-----------------------+--------------+
|United States of ...|[United, States, ...|                            true|   United_States_of_...|             4|
|             Germany|           [Germany]|                           false|                Germany|             1|
|              Mexico|            [Mexico]|                           false|                 Mexico|             1|
|         Netherlands|       [Netherlands]|                           false|            Netherlands|             1|
|               Italy|             [Italy]|                           false|                  Italy|             1|
|               China|             [China]|                           fa

                                                                                