In [1]:
from pyspark.sql.functions import col,split,concat,lit,substring_index,slice,desc, rank,count,max,countDistinct
from pyspark.sql.window import Window
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import findspark
import logging
import unittest

In [2]:
df=spark.read.option("multiLine", "true").json("/home/data/") 
display(df)
             

NameError: name 'spark' is not defined

In [None]:

# create logger
logger = logging.getLogger('volkwagon')
logger.setLevel(logging.INFO)

# create file handler and set logger level to INFO
log_path = "/logs"
file_handler = logging.FileHandler(log_path)
file_handler.setLevel(logging.INFO)

# create console handler and set logger level to INFO
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# add formatter to file handler and console handler
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# add file handler and console handler to logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)



In [None]:


def most_sold_cars():

    try:
        logging.info("Deriving new columns...")
        derived_df = df.withColumn('Brand', split(col('name'), ' ')[0]) \
                       .withColumn('Model', split(col('name'), ' ', 2)[1])

        logging.info("Selecting relevant columns...")
        selected_df = derived_df.select('Brand','Model','Year','Origin','Acceleration','Cylinders','Displacement','Horsepower','Miles_per_Gallon','Weight_in_lbs')

        logging.info("Grouping and counting cars by brand and year...")
        df_count_brand = selected_df.groupBy("year", "brand") \
                  .count()

        logging.info("Finding most sold car by brand and year...")
        part_df = Window.partitionBy("year") \
                    .orderBy(desc("count"), desc(concat("year", "count")))
        df_final = df_count_brand.withColumn("sold_brand", col("count")) \
                       .withColumn("rank", rank().over(part_df)) \
                       .filter(col("rank") == 1) \
                       .drop("count", "rank")

        logging.info("Displaying derived and counted data...")
        display(df_final)

        logging.info("Writing data to data lake...")
        df_final.write.mode("overwrite").partitionBy("year", "brand").parquet('/year/brand')
        
        logging.info("Function completed successfully.")

        return df_final
    except Exception as e:
        logging.error("An error occurred: %s", str(e))
        print("An error occurred: ", str(e))
        return None

In [None]:
most_sold_cars()

year,brand,sold_brand
1970-01-01,ford,5
1970-01-01,plymouth,5
1970-01-01,amc,5
1971-01-01,ford,5
1972-01-01,ford,4
1973-01-01,chevrolet,6
1974-01-01,ford,4
1975-01-01,ford,5
1976-01-01,ford,5
1977-01-01,chevrolet,4


DataFrame[year: string, brand: string, sold_brand: bigint]

In [None]:
def acceleration_by_origin():
    try:
        logger = logging.getLogger('acceleration_by_origin')
                
        logging.info("Deriving new columns...")
        derived_df = df.withColumn('Brand', split(col('name'), ' ')[0]) \
                       .withColumn('Model', split(col('name'), ' ', 2)[1])

        logging.info("Selecting relevant columns...")
        selected_df = derived_df.select('Brand','Model','Origin','Acceleration','Cylinders','Displacement','Horsepower','Miles_per_Gallon','Weight_in_lbs') \
            .withColumn('Acceleration',derived_df['Acceleration'].cast('double'))

        logging.info("Finding acceleration by origin...")
        part_spec = Window.partitionBy("origin") \
                        .orderBy(desc("acceleration"))

        df_fastest = selected_df.withColumn("rank", rank().over(part_spec)) \
                .filter(col("rank") == 1) \
                .drop("rank")

        logging.info("Displaying derived and counted data...")
        display(df_fastest)

        logging.info("Writing data to data lake...")
        df_fastest.write.mode("overwrite").partitionBy("origin").parquet('/origin/brand')

        logging.info("Function completed successfully.")

        return df_fastest
    except Exception as e:
        logging.error("An error occurred: %s", str(e))
        print("An error occurred: ", str(e))
        return None

In [None]:
acceleration_by_origin()

Brand,Model,Origin,Acceleration,Cylinders,Displacement,Horsepower,Miles_per_Gallon,Weight_in_lbs
peugeot,504,Europe,24.8,4,141.0,71,27.2,3190
toyota,corolla 1200,Japan,21.0,4,71.0,65,32.0,1836
chevrolet,monte carlo,USA,9.5,8,400.0,150,15.0,3761
pontiac,grand prix,USA,9.5,8,400.0,230,16.0,4278


DataFrame[Brand: string, Model: string, Origin: string, Acceleration: string, Cylinders: bigint, Displacement: double, Horsepower: bigint, Miles_per_Gallon: double, Weight_in_lbs: bigint]