In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext 
from pyspark.sql.window import Window
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder.appName("Kranio")\
    .config('spark.master','local[3]')\
    .config('spark.shuffle.sql.partitions',1)\
    .getOrCreate()

In [4]:
df_movies = spark.read.csv("C:/Users/Usurio/Desktop/kranio/movies.csv",header=True)

In [5]:
df_movies.printSchema()

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- credits: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- recommendations: string (nullable = true)



In [6]:
df_movies = df_movies.withColumn('budget', F.col('budget').cast(DoubleType())).withColumn('revenue', F.col('revenue').cast(DoubleType()))\
            .withColumn('release_date', F.col('release_date').cast(DateType()))

In [7]:
df_movies = df_movies.withColumn("profitable",((F.col("revenue")/F.col("budget"))*100))
df_movies = df_movies.withColumn("profitable", F.round(df_movies['profitable'],1))
df_movies = df_movies.withColumn("year", F.year('release_date')).withColumn('month', F.month('release_date'))

In [8]:
df_movies = df_movies.withColumn('semester', F.when(F.col('month').between(1,6),'1')\
            .when(F.col('month').between(7,12),'2'))

In [29]:
P1= df_movies.groupBy('year','semester').agg(
    F.max(F.struct('profitable', 'title')).alias('col')
).select('year','semester', 'col.*')

In [54]:
P2= df_movies.groupBy('year','genres').agg(
    F.mean('profitable').alias('Avg profitable')
).na.drop(subset=["Avg profitable"]).filter(F.col('Avg profitable')!=0)

In [96]:
fivegenres = ['Drama-Romance','Thriller-Drama','Comedy-Action','Drama-Horror','Horror']

P4 = df_movies.groupBy('genres','title').agg(
    F.mean('profitable').alias('avg profitable')
)

window = Window.partitionBy(P4['genres']).orderBy(P4['avg profitable'].desc())
P3 = P4.select('title','genres','avg profitable', F.rank().over(window).alias('rank'))\
.filter((F.col('rank') <= 10) & (F.col('Avg profitable')!=0) & (F.col('genres').isin(fivegenres)))

In [125]:
P4= df_movies.groupBy('month').agg(
    F.mean('popularity').alias('Avg popularity'))

In [126]:
P5 = df_movies.groupBy('year','genres').agg(
      F.count('release_date').alias('Total release_date')
).filter((F.col('year')> F.year(F.current_date())-6) & (F.col('year')< F.year(F.current_date()))).orderBy('year')

In [131]:
P1.repartition(1).write.csv("C:/Users/Usurio/Desktop/kranio/datap1.csv")
P2.repartition(1).write.csv("C:/Users/Usurio/Desktop/kranio/datap2.csv")
P3.repartition(1).write.csv("C:/Users/Usurio/Desktop/kranio/datap3.csv")
P4.repartition(1).write.csv("C:/Users/Usurio/Desktop/kranio/datap4.csv")
P5.repartition(1).write.csv("C:/Users/Usurio/Desktop/kranio/datap5.csv")