In [1]:
from pyspark.sql import SparkSession, functions as F

In [2]:
spark = SparkSession.builder.appName("movies_and_actors").getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/03 12:01:03 WARN Utils: Your hostname, Evgeniys-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.50.253 instead (on interface en0)
25/12/03 12:01:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/Users/ekrasnikov/spark-4.0.1/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/ekrasnikov/.ivy2.5.2/cache
The jars for the packages stored in: /Users/ekrasnikov/.ivy2.5.2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f2c5feed-3dca-406b-b323-14778759fc02;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.367 in 

In [4]:
movies_df = spark.read.csv("./movies.csv", header=True, inferSchema=True)
actors_df = spark.read.csv("./actors.csv", header=True, inferSchema=True)
movie_actors_df = spark.read.csv("./movie_actors.csv", header=True, inferSchema=True)

In [8]:
# Создание временных таблиц

In [9]:
movies_df.createOrReplaceTempView("movies")
actors_df.createOrReplaceTempView("actors")
movie_actors_df.createOrReplaceTempView("movie_actors")

In [10]:
# Топ-5 жанров по количеству фильмов

In [11]:
spark.sql("""
    SELECT genre, COUNT(movie_id) AS movies_count
    FROM movies
    GROUP BY genre
    ORDER BY movies_count DESC
""").show()

+------+------------+
| genre|movies_count|
+------+------------+
| Drama|           6|
|Action|           6|
|Comedy|           4|
|Horror|           2|
|Sci-Fi|           2|
+------+------------+



In [12]:
# Найти актера с наибольшим количеством фильмов

In [15]:
spark.sql("""
    SELECT a.name, ma.count_movies
    FROM (
        SELECT actor_id, COUNT(movie_id) AS count_movies
        FROM movie_actors
        GROUP BY actor_id
        ORDER BY count_movies DESC
        LIMIT 1
    ) ma
    INNER JOIN actors a ON a.actor_id = ma.actor_id
""").show()

+--------+------------+
|    name|count_movies|
+--------+------------+
|Actor_17|           5|
+--------+------------+



In [16]:
# Средний бюджет фильмов по жанрам

In [20]:
spark.sql("""
    SELECT genre, AVG(budget) AS avg_budget
    FROM movies
    GROUP BY genre
    ORDER BY genre ASC
""").show()

+------+--------------------+
| genre|          avg_budget|
+------+--------------------+
|Action|2.7492742561666667E7|
|Comedy|     5.20709662225E7|
| Drama| 6.076021856166667E7|
|Horror|      8.7281876775E7|
|Sci-Fi|       7.809715175E7|
+------+--------------------+



In [21]:
# Фильмы, в которых снялось больше одного актера из одной страны

In [29]:
spark.sql("""
    SELECT DISTINCT m.title, ma.country, ma.num_actors
    FROM (
        SELECT
            ma.movie_id,
            a.country, 
            COUNT(a.actor_id) OVER(PARTITION BY ma.movie_id, a.country) AS num_actors
        FROM movie_actors ma
        INNER JOIN actors a ON a.actor_id = ma.actor_id
        ORDER BY ma.movie_id ASC
    ) ma
    INNER JOIN movies m ON m.movie_id = ma.movie_id
    WHERE ma.num_actors > 1
    ORDER BY ma.num_actors DESC, ma.country ASC
""").show()

+--------+---------+----------+
|   title|  country|num_actors|
+--------+---------+----------+
| Movie_1|    India|         3|
|Movie_18|Australia|         2|
| Movie_7|    India|         2|
|Movie_15|    India|         2|
|Movie_10|       UK|         2|
|Movie_10|      USA|         2|
| Movie_3|      USA|         2|
| Movie_7|      USA|         2|
| Movie_2|      USA|         2|
+--------+---------+----------+



In [None]:
spark.stop()