In [None]:
# Some preparation in putty
CREATE KEYSPACE IF NOT EXISTS movielens WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE movielens;

CREATE TABLE IF NOT EXISTS users (
    user_id int PRIMARY KEY,
    age int,
    gender text,
    occupation text,
    zip text
);

CREATE TABLE IF NOT EXISTS ratings (
    user_id int,
    movie_id int,
    rating int,
    timestamp int,
    PRIMARY KEY ((user_id), movie_id)
);

CREATE TABLE IF NOT EXISTS movies (
    movie_id int PRIMARY KEY,
    title text,
    release_date text,
    video_release_date text,
    imdb_url text,
    genre0 int, genre1 int, genre2 int, genre3 int, genre4 int,
    genre5 int, genre6 int, genre7 int, genre8 int, genre9 int,
    genre10 int, genre11 int, genre12 int, genre13 int, genre14 int,
    genre15 int, genre16 int, genre17 int, genre18 int
);

In [None]:
# Once upload file to hdfs in putty:(file put in user/maria_dev/ml-100k)

In [None]:
# PySpark + Cassandra
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, desc

# 1. Create SparkSession connected to Cassandra
def create_spark_session():
    return SparkSession.builder \
        .appName("MovieLensAnalysis") \
        .config("spark.cassandra.connection.host", "127.0.0.1") \
        .getOrCreate()

# 2. Parse the u.user file from HDFS
def parse_user_file(spark):
    user_schema = "user_id INT, age INT, gender STRING, occupation STRING, zip STRING"
    return spark.read.csv("hdfs:///user/maria_dev/ml-100k/u.user", sep="|", schema=user_schema)

# 3. Load u.data as RDD from HDFS
def load_ratings_rdd(spark):
    return spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")

# 4. Convert RDD to DataFrame
def ratings_rdd_to_df(spark, rdd):
    return rdd.map(lambda line: line.split("\t")) \
              .map(lambda parts: (int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3]))) \
              .toDF(["user_id", "movie_id", "rating", "timestamp"])

# 5. Write DataFrame into Cassandra
def write_df_to_cassandra(df, table):
    df.write \
      .format("org.apache.spark.sql.cassandra") \
      .options(table=table, keyspace="movielens") \
      .mode("overwrite") \
      .save()

# 6. Read table from Cassandra into DataFrame
def read_from_cassandra(spark, table):
    return spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table=table, keyspace="movielens") \
        .load()

# Parse u.item file
def parse_movies_file(spark):
    item_schema = "movie_id INT, title STRING, release_date STRING, video_release_date STRING, imdb_url STRING, " + \
                  ",".join(["genre{} INT".format(i) for i in range(19)])
    return spark.read.csv("hdfs:///user/maria_dev/ml-100k/u.item", sep="|", schema=item_schema)

# Main Program for analysis
spark = create_spark_session()

print(">>> Loading user data")
users_df = parse_user_file(spark)
write_df_to_cassandra(users_df, "users")

print(">>> Loading rating data")
ratings_rdd = load_ratings_rdd(spark)
ratings_df = ratings_rdd_to_df(spark, ratings_rdd)
write_df_to_cassandra(ratings_df, "ratings")

print(">>> Loading movie data")
movies_df = parse_movies_file(spark)
write_df_to_cassandra(movies_df, "movies")

print("(i) Average rating per movie:")
avg_rating_df = ratings_df.groupBy("movie_id").agg(avg("rating").alias("avg_rating"))
avg_rating_df.show(10)

print("(ii) Top 10 movies with highest average rating:")
top10_avg = avg_rating_df.join(movies_df, on="movie_id") \
    .orderBy(desc("avg_rating")) \
    .select("movie_id", "title", "avg_rating")
top10_avg.show(10, truncate=False)

print("(iii) Users with at least 50 ratings and their favorite genres:")
active_users = ratings_df.groupBy("user_id").agg(count("movie_id").alias("movie_count")) \
    .filter(col("movie_count") >= 50)

user_rated = active_users.join(ratings_df, on="user_id")
user_movies = user_rated.join(movies_df, on="movie_id")

fav_genres = user_movies.groupBy("user_id") \
    .agg(*[avg("genre{}".format(i)).alias("genre{}_score".format(i)) for i in range(19)])
fav_genres.show(10)

print("(iv) Users under 20 years old:")
users_df.filter(col("age") < 20).show(10)

print("(v) Users aged 30-40 with occupation 'scientist':")
users_df.filter((col("occupation") == "scientist") & (col("age") >= 30) & (col("age") <= 40)).show(10)

spark.stop()