In [9]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

def load_movie_names():
    """
    Load movie names from the u.ITEM file and return a dictionary mapping movie IDs to movie titles.
    """
    movie_names = {}
    # Specify the path to your u.ITEM file
    with codecs.open("C:/SparkCourse/ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movie_names[int(fields[0])] = fields[1]
    return movie_names

def main():
    # Initialize Spark session
    spark = SparkSession.builder \
        .appName("PopularMovies") \
        .getOrCreate()

    # Broadcast the movie names dictionary
    name_dict = spark.sparkContext.broadcast(load_movie_names())

    # Define schema for the u.data file
    schema = StructType([
        StructField("userID", IntegerType(), True),
        StructField("movieID", IntegerType(), True),
        StructField("rating", IntegerType(), True),
        StructField("timestamp", LongType(), True)
    ])

    # Load movie data as a DataFrame
    movies_df = spark.read \
        .option("sep", "\t") \
        .schema(schema) \
        .csv("file:///SparkCourse/ml-100k/u.data")

    # Group by movieID and count occurrences
    movie_counts = movies_df.groupBy("movieID").count()

    # User-defined function to look up movie names
    def lookup_name(movie_id):
        return name_dict.value.get(movie_id, "Unknown")

    # Register the UDF
    lookup_name_udf = functions.udf(lookup_name)

    # Add a movieTitle column using the UDF
    movies_with_names = movie_counts.withColumn("movie", lookup_name_udf(functions.col("movieID")))

    # Sort the results by count in descending order
    sorted_movies_with_names = movies_with_names.orderBy(functions.desc("count"))

    # Show the top 10 most popular movies
    sorted_movies_with_names.show(10, truncate=False)

    # Stop the Spark session
    spark.stop()

if __name__ == "__main__":
    main()


+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows



In [1]:
# Implement Total Spent by Customer with DataFrames
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

spark = SparkSession.builder.appName("TotalSpentByCustomer").master("local[*]").getOrCreate()

# Create schema when reading customer-orders
customerOrderSchema = StructType([ \
                                  StructField("cust_id", IntegerType(), True),
                                  StructField("item_id", IntegerType(), True),
                                  StructField("amount_spent", FloatType(), True)
                                  ])

# Load up the data into spark dataset
customersDF = spark.read.schema(customerOrderSchema).csv("file:///Spark/customer-orders.csv")

totalByCustomer = customersDF.groupBy("cust_id").agg(func.round(func.sum("amount_spent"), 2) \
                                      .alias("total_spent"))

totalByCustomerSorted = totalByCustomer.sort("total_spent")

totalByCustomerSorted.show(totalByCustomerSorted.count())

# spark.stop()

+-------+-----------+
|cust_id|total_spent|
+-------+-----------+
|     45|    3309.38|
|     79|    3790.57|
|     96|    3924.23|
|     23|    4042.65|
|     99|    4172.29|
|     75|     4178.5|
|     36|    4278.05|
|     98|    4297.26|
|     47|     4316.3|
|     77|    4327.73|
|     13|    4367.62|
|     48|    4384.33|
|     49|     4394.6|
|     94|    4475.57|
|     67|    4505.79|
|     50|    4517.27|
|     78|    4524.51|
|      5|    4561.07|
|     57|     4628.4|
|     83|     4635.8|
|     91|    4642.26|
|     74|    4647.13|
|     84|    4652.94|
|      3|    4659.63|
|     12|    4664.59|
|     66|    4681.92|
|     56|    4701.02|
|     21|    4707.41|
|     80|    4727.86|
|     14|    4735.03|
|     37|     4735.2|
|      7|    4755.07|
|     44|    4756.89|
|     31|    4765.05|
|     82|    4812.49|
|      4|    4815.05|
|     10|     4819.7|
|     88|    4830.55|
|     20|    4836.86|
|     89|    4851.48|
|     95|    4876.84|
|     38|    4898.46|
|     76| 