In [0]:
#%fs ls /FileStore/tables/Product-Sales/
fileName = "/FileStore/tables/Product-Sales/Product_Sales.csv"
sales_df = spark.read\
                .format("csv")\
                .option("inferSchema","false")\
                .option("header","true")\
                .option("sep",",")\
                .load(fileName)
sales_df = sales_df.drop("_c0")
sales_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Q-P1: string (nullable = true)
 |-- Q-P2: string (nullable = true)
 |-- Q-P3: string (nullable = true)
 |-- Q-P4: string (nullable = true)
 |-- S-P1: string (nullable = true)
 |-- S-P2: string (nullable = true)
 |-- S-P3: string (nullable = true)
 |-- S-P4: string (nullable = true)



In [0]:
##DDL Schema def
from pyspark.sql.types import *

sales_struct_schema = StructType([
    StructField("_c0", StringType()),
    StructField("Date", StringType()),
    StructField("Q-P1", IntegerType()),
    StructField("Q-P2", IntegerType()),
    StructField("Q-P3", StringType()), 
    StructField("Q-P4", IntegerType()),
    StructField("S-P1", FloatType()),
    StructField("S-P2", FloatType()),
    StructField("S-P3", FloatType()),
    StructField("S-P4", FloatType())
])

sales_df = spark.read\
                .format("csv")\
                .schema(sales_struct_schema)\
                .option("header","true")\
                .load(fileName)
sales_df = sales_df.drop("_c0")
sales_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Q-P1: integer (nullable = true)
 |-- Q-P2: integer (nullable = true)
 |-- Q-P3: string (nullable = true)
 |-- Q-P4: integer (nullable = true)
 |-- S-P1: float (nullable = true)
 |-- S-P2: float (nullable = true)
 |-- S-P3: float (nullable = true)
 |-- S-P4: float (nullable = true)



In [0]:
from pyspark.sql.functions import *
sales_df = sales_df.withColumnRenamed("Q-p1", "Q-Product1")\
                   .withColumnRenamed("Q-p2", "Q-Product2")\
                   .withColumnRenamed("Q-p3", "Q-Product3")\
                   .withColumnRenamed("Q-p3", "Q-Product4")\
                   .withColumnRenamed("S-p1", "S-Product1")\
                   .withColumnRenamed("S-p2", "S-Product2")\
                   .withColumnRenamed("S-p3", "S-Product3")\
                   .withColumnRenamed("S-p4", "S-Product4")
sales_df.show(20)

+----------+----------+----------+----------+----+----------+----------+----------+----------+
|      Date|Q-Product1|Q-Product2|Q-Product3|Q-P4|S-Product1|S-Product2|S-Product3|S-Product4|
+----------+----------+----------+----------+----+----------+----------+----------+----------+
|13-06-2010|      5422|      3725|       576| 907|  17187.74|   23616.5|   3121.92|   6466.91|
|14-06-2010|      7047|       779|      3578|1574|  22338.99|   4938.86|  19392.76|  11222.62|
|15-06-2010|      1572|      2082|       595|1145|   4983.24|  13199.88|    3224.9|   8163.85|
|16-06-2010|      5657|      2399|      3140|1672|  17932.69|  15209.66|   17018.8|  11921.36|
|17-06-2010|      3668|      3207|      2184| 708|  11627.56|  20332.38|  11837.28|   5048.04|
|18-06-2010|      2898|      2539|       311|1513|   9186.66|  16097.26|   1685.62|  10787.69|
|19-06-2010|      6912|      1470|      1576|1608|  21911.04|    9319.8|   8541.92|  11465.04|
|20-06-2010|      5209|      2550|      3415| 842|

In [0]:
from pyspark.sql import functions as SF
from pyspark.sql.window import Window

get_year = SF.year(SF.to_date(col("Date"), "dd-MM-yyyy"))
get_month = SF.month(SF.to_date(col("Date"), "dd-MM-yyyy"))
get_month_unique_count = countDistinct("Month").alias("distinct_count")
filter_incomplete_year = "distinct_count < 12"

unwanted_yrs_df = sales_df.withColumn("Month", get_month)\
                   .withColumn("Year", get_year)\
                   .groupBy("Year")\
                   .agg(get_month_unique_count)\
                   .filter(col("Year").isNotNull())

unwanted_yrs_df = unwanted_yrs_df.where(filter_incomplete_year).select("Year")
unwanted_yrs = list(unwanted_yrs_df.select(unwanted_yrs_df.Year).toPandas()['Year'])

In [0]:
sales_df = sales_df.withColumn("Year", get_year)\
                   .filter(~col("Year").isin(unwanted_yrs)).show() 


+----------+----------+----------+----------+----+----------+----------+----------+----------+----+
|      Date|Q-Product1|Q-Product2|Q-Product3|Q-P4|S-Product1|S-Product2|S-Product3|S-Product4|Year|
+----------+----------+----------+----------+----+----------+----------+----------+----------+----+
|01-01-2011|       281|      3956|      4186|1537|    890.77|  25081.04|  22688.12|  10958.81|2011|
|02-01-2011|      7665|      1350|      4266|1789|  24298.05|    8559.0|  23121.72|  12755.57|2011|
|03-01-2011|       937|      3758|      4311| 314|   2970.29|  23825.72|  23365.62|   2238.82|2011|
|04-01-2011|      6378|       968|      4530| 995|  20218.26|   6137.12|   24552.6|   7094.35|2011|
|05-01-2011|       731|      2174|      5908|1505|   2317.27|  13783.16|  32021.36|  10730.65|2011|
|06-01-2011|      2032|      3556|       349| 815|   6441.44|  22545.04|   1891.58|   5810.95|2011|
|07-01-2011|      1514|      3321|      5699|1371|   4799.38|  21055.14|  30888.58|   9775.23|2011|
