### This example was created in Zepline

In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, round
from pyspark.sql.types import IntegerType, DoubleType


class SparkJob:
    def __init__(self):
        self.spark = (
            SparkSession.builder.appName("Python Spark SQL basic example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate()
        )
        self.input_directory = "datasets"

    def extract_timeseries(self):
        # Read in dataframe
        dataset = "sample_dataframe"
        df = self.spark.read.option("header", True).csv(
            f"../{self.input_directory}/{dataset}/{dataset}.csv"
        )
        df.printSchema()
        return df

    def agg_date_select(self):
        """Select Date"""
        df = self.extract_timeseries()
        df.select("date").show(10)
        return df

    def agg_date_count_orderded(self):
        """Group by date count"""
        df = self.extract_timeseries()
        df = df.groupby("date").count().orderBy("date", "count")
        df.show(truncate=False)
        return df

    def agg_date_count_orderded_fitler(self):
        """Filter Date Count"""
        df = self.agg_date_count_orderded()

        # Filter
        df = df.filter("count < 68")

        # Create a column.
        df = df.withColumn("percent", lit(23.354))

        # Convert to interger,
        df = df.withColumn("percent", df["percent"].cast(IntegerType()))

        # df = df.withColumnRenamed('local_authority','council')
        df.show(truncate=False)
        return df

    def agg_date_count_orderded_fitler_write(self):
        """Writing data to output directory"""
        df = self.agg_date_count_orderded_fitler()
        print("writing data to directory")
        df.write.mode("overwrite").format("csv").save(
            f"../{self.input_directory}/sample_dataframe_aggregation_csv"
        )
        df.write.mode("overwrite").format("parquet").save(
            f"../{self.input_directory}/sample_dataframe_aggregation_parquet"
        )
        print("completed.")

In [55]:
# Run the date count jobs tasks
job = SparkJob()
job.agg_date_count_orderded_fitler_write()

root
 |-- a: string (nullable = true)
 |-- b: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- date: string (nullable = true)

+----------+-----+
|date      |count|
+----------+-----+
|2020-03-15|68   |
|2020-03-16|32   |
+----------+-----+

+----------+-----+-------+
|date      |count|percent|
+----------+-----+-------+
|2020-03-16|32   |23     |
+----------+-----+-------+

writing data to directory
completed.
