# **Distributed Database Management - Project A**
Technion – Israel Institute of Technology - Project Assignment 1 <br>
Shalom Mauda & Amit Machpud <br>

The assignment as a whole includes a process of loading data, analyzing and understanding them, drawing
conclusions, properly designing a distributed database and implementing the design using spark. We will
break down the task into several parts.
<br>


## **Background**
For the benefit of this project assignment, you are called to help the cinema company ”Distributed
Theaters”.
”Distributed Theaters” is the 2nd largest cinema company in Israel and it holds 70 cinemas across the
country. However, despite their size they still aim to surpass the number 1 cinema company in the
country. In order to surpass the best cinema company in Israel, ”Distributed Theaters” aims to re-design
it’s database in order to maximize their efficiency and increase sales.
You are requested to help ”Distributed Theaters” to re-design it’s database.
<br>



## **Cinemas and Sites**

”Distributed Theaters” holds 70 cinemas across the country by the following : Haifa - 15 cinemas, Tel
Aviv - 17 cinemas, Jerusalem - 22 cinemas, Eilat - 12 cinemas, Tiberias - 4 cinemas.
In addition, each city of the above has a single component of computation, you can refer it as a ”Site” in
the terminology of our course, i.e. there are 5 sites.
All sites are equal in terms of computational ability and you can assume that each can storage infinite
amount of data.
Furthermore, each site is connected to another in a two-way direction, i.e. data can move from one site
to another and vice versa.
<br>


## **Clients**
Clients are willing to buy tickets for a movie they like and have the best time of their life. In order to buy
a movie ticket, clients often use the Distributed Theaters web search system, which can display many
details about movies and showtimes. Each client search can be mapped into a query.
Clients can query the system from 5 different locations in the country: Haifa, Jerusalem, Tel Aviv, Eilat
and Gesher (Gesher is a kibbutz in the Beit She’an Valley in northeastern Israel, some says it’s the best
place in the entire world).
Note that a query of a client, might output an empty set of movies. However, as new movies come, such
query might result with movies later on.
<br>


---
## **Data Set**
The data in this project is based on The Movies Dataset from Kaggle. However, it is heavily tweaked
and went through many modifications.

### Users
This source is provided within users.csv.
* user id - User’s unique identifier.
* user location - User’s location (city).

### Movies
This source is provided within movies.csv.
* movie id - Movie’s unique identifier.
* genres - The genres of the movie.
* overview - An overview on the movie’s plot.
* production companies - The production companies that were involved in the making of the movie.
* production countries - The countries that the movie was produced it.
* release date - The movie release date.
* revenue - The movie’s revenue.
* spoken languages - Languages in which the movie is available in.
* tagline - The movie’s tagline
* title - The title of the movie.
* cities - The cities that the movies is available in.



### Credits
This source is provided within credits.csv.
* cast - The cast of the movie that corresponds to the id field.
* crew - The crew of the movie that corresponds to the id field.
* id - Movie’s identifier (corresponding to ’movie id’ in movies.csv)
2




### Queries
This source is provided within queries.csv.
* user id - User’s unique identifier.
* genres - List of genres the user is interested in, the movie genre’s should be in at least one of them.
* lang - List of languages, the movie should be filmed in at least one of them.
* actors - Lists of actors, such all actors should be in the movie.
* director - Movie’s director.
* cities - List of cities, movie should be played in one of.
* country - List of countries, movie should be produced in at least one of.
* from realese date - Minimal release date of a movie that the user is interested in.
* production company - List of movies productions, movie should be produced in at least one of.


### Tickets
This source is provided within tickets.csv.
* user id - User’s unique identifier.
* movie id - Movie’s unique identifier.
* number of tickets - Number of tickets the user had bought.
* city - City where the show takes place.
* cinema id - Cinema’s unique identifier within a city.
<br>


---
# **Part A - Extract and Transform (15 points)**

Load data, preprocess it, apply transformations. Use Spark.

This part is preliminary to the analysis part and the implementation part.


## Initialize

* installing findspark and pyspark
* initializing spark


### Code

In [None]:
!pip install findspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

def init_spark(app_name: str):
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  sc = spark.sparkContext
  return spark, sc

spark, sc = init_spark('demo')

---
## Credits - Extract and Transform

Steps:

* first part is similar to what we got
* clean the crew and cast records from special characters
* extract next words after name of director to a column
* extract next words after name of actors into a list to a actors column

Data:

* From crew colmun we took only Director
* From cast column we took only Actors
* The rest of data from those columns does not matter for the queries analysis

### Code

In [None]:
import re 
from pyspark.sql import functions as f

credits =  spark.read.format("csv")\
  .option("delimiter", "\t")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .load("credits.csv")

prog = re.compile('\\[(.*?)\\]')
second_match = f.udf(lambda x: prog.findall(x)[1])
id_extract = f.udf(lambda x: x.split(",")[-1])

credits = credits\
  .withColumn("id", id_extract("cast,crew,id"))\
  .withColumn("cast", f.regexp_extract(f.col("cast,crew,id"), '\\[(.*?)\\]', 0))\
  .withColumn("crew", f.concat(f.lit("["),second_match("cast,crew,id"), f.lit("]")))\
  .select("cast", "crew", "id")
# cast 'id' to integer
credits = credits.withColumn("id", f.col("id").cast('int'))

In [None]:
# actors
to_remove = ["'cast_id': ", '[0-9]', "\W+", "'character': ", ":", ",", " \(voice\)", "'credit_id': "]
for string in to_remove:
  credits = credits.withColumn("cast", f.regexp_replace("cast", string, ' '))

noofwordstoextract = 2
credits = credits.withColumn("ArrayOfWords", f.split("cast"," "))\
.withColumn("actors", f.expr(f"""filter(transform(ArrayOfWords,(x,e)-> 
CASE WHEN x in ('name') 
THEN array_join(slice(ArrayOfWords,e+2,{noofwordstoextract}),' ') ELSE NULL END)
,y-> y is not NULL)""")).drop("ArrayOfWords").drop("cast")

In [None]:
# directors
credits = credits.withColumn("crew", f.regexp_replace("crew", "'Director'", 'WilNeverBrAgain'))
credits = credits.withColumn("crew", f.regexp_replace("crew", "\W+", ' '))
credits = credits.withColumn("crew", f.regexp_replace("crew", " name", ''))

noofwordstoextract = 2
credits = credits.withColumn("ArrayOfWords", f.split("crew"," "))\
.withColumn("director", f.expr(f"""filter(transform(ArrayOfWords,(x,e)-> 
CASE WHEN x in ('WilNeverBrAgain') 
THEN array_join(slice(ArrayOfWords,e+2,{noofwordstoextract}),' ') ELSE NULL END)
,y-> y is not NULL)""")).drop("ArrayOfWords").drop("crew")

### Output

In [None]:
credits.show(5)

+-----+--------------------+-----------------+
|   id|              actors|         director|
+-----+--------------------+-----------------+
|  862|[Tom Hanks, Tim A...|  [John Lasseter]|
| 8844|[Robin Williams, ...|   [Joe Johnston]|
|15602|[Walter Matthau, ...|  [Howard Deutch]|
|31357|[Whitney Houston,...|[Forest Whitaker]|
|11862|[Steve Martin, Di...|  [Charles Shyer]|
+-----+--------------------+-----------------+
only showing top 5 rows



---
## Movies - Extract and Transform

columns to extract and parse:
<br>
`'movie_id', 'genres', 'production_companies', 'production_countries', 'release_date', 'spoken_languages', 'cities'`

Steps:

* extract data feathers from the .csv file 
* cast id from string to int
* changing column name 
* parse the names of director from the string
* parse the lists from the string for genres, spoken_languages, cities, etc.
* change release_date format to date format

Data:

Irrelevant colmuns to drop:
* overview
* revenue
* tagline
* title

### Code

In [None]:
# load Movies
path = 'movies.csv'
movies = spark.read.option("Header", True).option("multiline", "true").option("escape", "\"").csv(path)

# colomns select: id, genres, prod_comp, countries, release_date, languages, cities
movies = movies.select('movie_id', 'genres', 'production_companies', 'production_countries', 'release_date', 'spoken_languages', 'cities')
movies = movies.withColumnRenamed('movie_id', 'id')
movies = movies.withColumnRenamed('production_companies', 'prod_comp')
movies = movies.withColumnRenamed('production_countries', 'countries')
movies = movies.withColumnRenamed('spoken_languages', 'languages')
# change id colmn to int
movies = movies.withColumn("id", f.col("id").cast('int'))

In [None]:
# genres string to list
movies = movies.withColumn("genres", f.regexp_replace("genres", '\W+', ' '))

noofwordstoextract = 1
movies = movies.withColumn("ArrayOfWords", f.split("genres"," "))\
.withColumn("genres_list", f.expr(f"""filter(transform(ArrayOfWords,(x,e)-> 
CASE WHEN x in ('name') 
THEN array_join(slice(ArrayOfWords,e+2,{noofwordstoextract}),' ') ELSE NULL END)
,y-> y is not NULL)""")).drop("ArrayOfWords").drop("genres")
movies = movies.withColumnRenamed('genres_list', 'genres')

In [None]:
# production company TODO check for companies with special-charecters name
movies = movies.withColumn("prod_comp", f.regexp_replace("prod_comp", ' ', ''))
movies = movies.withColumn("prod_comp", f.regexp_replace("prod_comp", '\W+', ' '))

noofwordstoextract = 1
movies = movies.withColumn("ArrayOfWords", f.split("prod_comp"," "))\
.withColumn("prod_comp_list", f.expr(f"""filter(transform(ArrayOfWords,(x,e)-> 
CASE WHEN x in ('name') 
THEN array_join(slice(ArrayOfWords,e+2,{noofwordstoextract}),' ') ELSE NULL END)
,y-> y is not NULL)""")).drop("ArrayOfWords").drop("prod_comp")
movies = movies.withColumnRenamed('prod_comp_list', 'prod_comp')

In [None]:
# countries
movies = movies.withColumn("countries", f.regexp_replace("countries", ' ', ''))
movies = movies.withColumn("countries", f.regexp_replace("countries", '\W+', ' '))

noofwordstoextract = 1
movies = movies.withColumn("ArrayOfWords", f.split("countries"," "))\
.withColumn("countries_2", f.expr(f"""filter(transform(ArrayOfWords,(x,e)-> 
CASE WHEN x in ('name') 
THEN array_join(slice(ArrayOfWords,e+2,{noofwordstoextract}),' ') ELSE NULL END)
,y-> y is not NULL)""")).drop("ArrayOfWords").drop("countries")
movies = movies.withColumnRenamed('countries_2', 'countries')

In [None]:
# release date format type
movies = movies.withColumn("release_date", f.regexp_replace("release_date", '/', '-'))
movies = movies.withColumn('release_date', f.to_date(f.col("release_date"),"dd-MM-yyyy"))

In [None]:
# languages
movies = movies.withColumn("languages", f.regexp_replace("languages", '\W+', ' '))

noofwordstoextract = 1
movies = movies.withColumn("ArrayOfWords", f.split("languages"," "))\
.withColumn("languages2", f.expr(f"""filter(transform(ArrayOfWords,(x,e)-> 
CASE WHEN x in ('iso_639_1') 
THEN array_join(slice(ArrayOfWords,e+2,{noofwordstoextract}),' ') ELSE NULL END)
,y-> y is not NULL)""")).drop("ArrayOfWords").drop("languages")
movies = movies.withColumnRenamed('languages2', 'languages')

In [None]:
# cities 
movies = movies.withColumn("cities", f.regexp_replace("cities", '\[', ''))
movies = movies.withColumn("cities", f.regexp_replace("cities", '\]', ''))
movies = movies.withColumn("cities", f.regexp_replace("cities", '\'', ''))
movies = movies.withColumn('cities', f.split(f.col("cities"),", "))

### Output

In [None]:
movies.show(5)

+-----+------------+--------------------+--------------------+--------------------+--------------------+---------+
|   id|release_date|              cities|              genres|           prod_comp|           countries|languages|
+-----+------------+--------------------+--------------------+--------------------+--------------------+---------+
|  862|  1995-10-30|[Eilat, Tel Aviv,...|[Animation, Comed...|[PixarAnimationSt...|[UnitedStatesofAm...|     [en]|
| 8844|  1995-12-15|[Jerusalem, Tiber...|[Adventure, Fanta...|[TriStarPictures,...|[UnitedStatesofAm...| [en, fr]|
|15602|  1995-12-22|[Eilat, Haifa, Te...|   [Romance, Comedy]|[WarnerBros, Lanc...|[UnitedStatesofAm...|     [en]|
|31357|  1995-12-22|[Tel Aviv, Tiberi...|[Comedy, Drama, R...|[TwentiethCentury...|[UnitedStatesofAm...|     [en]|
|11862|  1995-02-10|[Haifa, Jerusalem...|            [Comedy]|[SandollarProduct...|[UnitedStatesofAm...|     [en]|
+-----+------------+--------------------+--------------------+------------------

---
## Users  - Extract and Transform

* extract data feathers from the .csv file 
* cast id from string to int
* changing column name 

### code

In [None]:
# load Users
path = 'users.csv'
users = spark.read.option("Header", True).option("multiline", "true").option("escape", "\"").csv(path)
users = users.withColumn("id", f.col("user_id").cast('int')).drop('user_id')
users = users.select('id', 'user_location')

### output

In [None]:
users.show(5)
users.schema

+---+-------------+
| id|user_location|
+---+-------------+
|  1|        Eilat|
|  2|     Tel Aviv|
|  3|        Eilat|
|  4|    Jerusalem|
|  5|Kibuts Gesher|
+---+-------------+
only showing top 5 rows



StructType(List(StructField(id,IntegerType,true),StructField(user_location,StringType,true)))

---
## Queries - Extract and Transform

Steps:

* extract all data feathers from the .csv file 
* cast id from string to int
* for all feathers remove all the apostrophe and the parenthesis from the edges
* split the records to list or int format type


### Code

In [None]:
# load querires
path = 'queries.csv'
queries = spark.read.option("Header", True).option("multiline", "true").option("escape", "\"").csv(path)
queries = queries.withColumnRenamed('from_realese_date', 'date').withColumnRenamed('production_company', 'company')

# cast id to int
queries = queries.withColumn("id", f.col("user_id").cast('int')).drop('user_id')

In [None]:
feathers = ['genres', 'lang', 'actors', 'director', 'cities', 'country', 'date', 'company']
for feather in feathers:
    for special_char in ['\[', '\]', '\'']:
        queries = queries.withColumn(feather, f.regexp_replace(feather, special_char, ''))
    if feather != 'date':
        queries = queries.withColumn(feather, f.split(f.col(feather), ", "))
    else:
        queries = queries.withColumn(feather, f.col(feather).cast('int'))

In [None]:
# select colomns
queries = queries.select('id', 'genres', 'lang', 'actors', 'director', 'cities', 'country', 'date', 'company')

### Output

In [None]:
queries.show(5)

+----+--------------------+-------------------+--------------------+--------------+--------------------+------------------+----+--------------------+
|  id|              genres|               lang|              actors|      director|              cities|           country|date|             company|
+----+--------------------+-------------------+--------------------+--------------+--------------------+------------------+----+--------------------+
| 981|  [Western, Mystery]|  [English, Srpski]|                  []|[Nae Caranfil]|   [Haifa, Tiberias]|                []|2012|[Katakuri-ke no K...|
|3775|   [Action, Western]|          [English]|                  []|            []|          [Tel Aviv]|                []|2013|[Clavius Base, Tr...|
|4095|             [Crime]|[English, עִבְרִית]|[Kenneth Alton, J...|            []|         [Jerusalem]|[Belgium, Moldova]|1995|[Peter Carsten Pr...|
|3363|[Animation, Western]|          [English]|                  []|[Philip Dunne]|             [Hai

---
## Tickets - Extract and Transform

Steps:

* Extract data feathers from the .csv file 
* Cast ids and number of tickets to int
* Reorder columns

Data:

Irrelevant colmuns to drop:
* cinema_id

### Code

In [None]:
# tickets
path = 'tickets.csv'
tickets = spark.read.option("Header", True).option("multiline", "true").option("escape", "\"").csv(path)

tickets = tickets.withColumn("user_id", f.col("user_id").cast('int'))
tickets = tickets.withColumn("movie_id", f.col("movie_id").cast('int'))
tickets = tickets.withColumn("n_tickets", f.col("number_of_tickets").cast('int'))

tickets = tickets.select('user_id', 'movie_id', 'n_tickets', 'city')

### Output

In [None]:
tickets.show(5)

+-------+--------+---------+--------+
|user_id|movie_id|n_tickets|    city|
+-------+--------+---------+--------+
|      1|     110|        1|   Eilat|
|      1|     147|        1|Tiberias|
|      1|     858|        2|Tel Aviv|
|      1|    1246|        1|   Eilat|
|      1|    1968|        1|   Eilat|
+-------+--------+---------+--------+
only showing top 5 rows



---
# **Part B - Data Analysis (25 points)**

Analyze the dataset and derive at least 4 insights. Use spark.
Insights should refer to points that will help design the database on the next section.


You should mainly consider the users behaviour in terms of tickets buying and the nature of the queries,
but feel free to include additional aspects. A great opportunity to let your creativity flourish.


As mentioned earlier, a client’s query might output an empty set of movies. However, as new movies
come, such query might result with movies later on. Therefore, in your analysis you should still consider
such queries and not ignore them.


---
## First look on the data

Conduct a preliminary analysis of the data and draw basic conclusions

### Queries

* there is a total of 100,000 queries

Count the number of queries coming from each city:

* Kibuts Gesher - 5,288
* Haifa - 21,652
* Jerusalem - 30,873
* Tel Aviv - 24,839
* Eilat - 17,348

### Users

* there are 10,656 total of queries

Count the number of users from each city:

* Kibuts Gesher - 576
* Haifa - 2,324
* Jerusalem - 3,263
* Tel Aviv - 2,643
* Eilat - 1,850

### Tickets

* there are 850,462 total of tickets order

Count the number of tickets sold in each city:

* Kibuts Gesher - 27,915
* Haifa - 228,375
* Jerusalem - 315,926
* Tel Aviv - 140,203
* Eilat - 138,043


The users from Jerusalem buy the most tickets, as can be seen from the output


### Movies

* there are 45,460 total of movies records
* 63.23% of movies have English as a spoken-language

<br>


### Code


In [None]:
# queries
users_id = queries.select('id')
# full outer join cities and users
users_location = users_id.join(users, users_id[0] == users[0], 'fullouter').select('user_location')
queries_count = users_location.groupBy().count()
queries_count_per_city = users_location.groupBy('user_location').count()

In [None]:
# users
users_count = users.groupBy().count()
users_count_per_city = users.groupBy('user_location').count()

In [None]:
# tickets
tickets_count = tickets.select('user_id', 'n_tickets')
tickets_location = tickets_count.join(users, tickets_count[0] == users[0], 'fullouter').select('user_location', 'n_tickets')
tickets_count = tickets_location.groupBy().count()
tickets_count_per_city = tickets_location.groupBy('user_location').count()

In [None]:
tickets_count = tickets.select('user_id', 'n_tickets')
tickets_location = tickets_count.join(users, tickets_count[0] == users[0], 'fullouter').select('user_location', 'n_tickets')
tickets_count_per_city = tickets_location.groupBy(f.col('user_location')).sum('n_tickets')
tickets_count = tickets_count_per_city.select('sum(n_tickets)').groupBy().sum()

In [None]:
# movies language English ratio
movies_count = movies.groupBy().count().collect()[0][0]
lang_filter = f.udf(lambda x: 1 if 'en' in x else 0)
english_count = movies.select('languages').withColumn('en', lang_filter('languages')
                    .cast('int')).select('en').groupBy().sum()
english_ratio = english_count.collect()[0][0] / movies_count

### Output

In [None]:
# count queries for each city
queries_count_per_city.show(5)
queries_count.show(5)

+-------------+-----+
|user_location|count|
+-------------+-----+
|Kibuts Gesher| 5288|
|        Haifa|21652|
|    Jerusalem|30873|
|     Tel Aviv|24839|
|        Eilat|17348|
+-------------+-----+

+------+
| count|
+------+
|100000|
+------+



In [None]:
# count users for each city
users_count_per_city.show(5)
users_count.show(5)

+-------------+-----+
|user_location|count|
+-------------+-----+
|Kibuts Gesher|  576|
|        Haifa| 2324|
|    Jerusalem| 3263|
|     Tel Aviv| 2643|
|        Eilat| 1850|
+-------------+-----+

+-----+
|count|
+-----+
|10656|
+-----+



In [None]:
# count tickets for each city
tickets_count_per_city.show(5)
tickets_count.show(5)

+-------------+--------------+
|user_location|sum(n_tickets)|
+-------------+--------------+
|Kibuts Gesher|         27915|
|        Haifa|        228375|
|    Jerusalem|        315926|
|     Tel Aviv|        140203|
|        Eilat|        138043|
+-------------+--------------+

+-------------------+
|sum(sum(n_tickets))|
+-------------------+
|             850462|
+-------------------+



In [None]:
print(f"English ratio in movies: {round(english_ratio, 4)*100}%")
print(f"movies count: {movies_count}")

English ratio in movies: 63.23%
movies count: 45460


---
## Users location searching for a movie by city

Count number of requests from users-location to each city

* Join cities with users
* replacing the ID column with the user's location
* to get a matrix of user locations group by user's location




### Code


In [None]:
# select colmns from queries
citiesdf = queries.select('cities', 'id')

# splite cities into colmns
for city in ['Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat']:
  city_filter = f.udf(lambda x: 1 if city in x else 0)
  citiesdf = citiesdf.withColumn(city, city_filter("cities"))

# select colmns
citiesdf = citiesdf.select('id', 'Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat')

In [None]:
# change user id column to user location
# import users
path = 'users.csv'
users = spark.read.option("Header", True).option("multiline", "true").option("escape", "\"").csv(path)

# full outer join cities and users
users_cities = citiesdf.join(users, citiesdf[0] == users[0], 'fullouter').select('user_location', 'Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat')

# cast numbers to int format
feathers = ['Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat']
for feather in feathers:
  users_cities = users_cities.withColumn(feather, f.col(feather).cast('int'))

In [None]:
locations_matrix = users_cities
# group by user location
locations_matrix = locations_matrix.groupBy(f.col('user_location')).sum('Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat')

### Output

In [None]:
# show results
locations_matrix.show(5)

+-------------+-------------+--------------+----------+-------------+----------+
|user_location|sum(Tel Aviv)|sum(Jerusalem)|sum(Haifa)|sum(Tiberias)|sum(Eilat)|
+-------------+-------------+--------------+----------+-------------+----------+
|Kibuts Gesher|            0|             0|      1060|         5288|         0|
|        Haifa|         1294|             0|     21652|         4306|         0|
|    Jerusalem|        18588|         30873|         0|            0|         0|
|     Tel Aviv|        24839|          3688|         0|            0|         0|
|        Eilat|            0|             0|         0|            0|     17348|
+-------------+-------------+--------------+----------+-------------+----------+



## Insight 1

* **Eilat** - we can see that all Eilat users only searching for movies in Eilat
* **Tel Aviv** - 14.8% of the queries from Tel-Aviv search for movies in Jerusalem
* **Jerusalem** - 60.2% of the queries from Jerusalem search for movies in Tel-Aviv
* **Haifa** - 6% of the queries from Haifa search for movies in Tel-Aviv, and 20% for movies in Tiberias
* **Kibuts Gesher** - 20% of the queries from Kibuts Gesher search for movies in Haifa

We can see that 100% of users searching for movies near their city

---
## Users location searching for a movie by genre

Count number of requests from users-location to each genre

* from queries, select genres and id
* join genres with users
* replace the id column with the user location
* get the locations genres matrix grouped by user location





### Code

In [None]:
# select colmns from queries
genres_df = queries.select('genres', 'id')

def get_distinct_list_of_lists(list_of_lists):
  res_list = []
  for list_ in list_of_lists:
    for element in list_:
      if element not in res_list:
        res_list.append(element)
  return res_list

tmp = genres_df.rdd.map(lambda x: x[0]).collect()
genres = get_distinct_list_of_lists(tmp)
genres = genres[:-1]
# splite genres into colmns
for genre in genres:
  genre_filter = f.udf(lambda x: 1 if genre in x else 0)
  genres_df = genres_df.withColumn(genre, genre_filter("genres"))
genres_df = genres_df.drop('genres')

In [None]:
# full outer join genres and users
users_genres = genres_df.join(users, genres_df[0] == users[0], 'fullouter')
users_genres = users_genres.drop('id')
# cast genres to int format
for genre in genres:
  users_genres = users_genres.withColumn(genre, f.col(genre).cast('int'))
cols = list(users_genres.columns)
cols = [cols[-1]] + cols[:-2]
users_genres = users_genres.select(cols)

# group by user location
locations_genres = users_genres.groupBy(f.col('user_location')).sum()

In [None]:
# get the average of geners look up
genres_avg = users_genres.groupBy(f.col('user_location')).avg()
genres_avg = genres_avg.groupBy().avg()

### Output

In [None]:
# show results
locations_genres.show(5)
genres_avg.show(5)

+-------------+------------+------------+-----------+----------+--------------+----------------+-------------+------------+--------------+----------+------------+-----------+--------------------+-----------+--------+-------------+------------+-----------+------------+----------+
|user_location|sum(Western)|sum(Mystery)|sum(Action)|sum(Crime)|sum(Animation)|sum(Documentary)|sum(TV Movie)|sum(Fantasy)|sum(Adventure)|sum(Drama)|sum(Romance)|sum(Family)|sum(Science Fiction)|sum(Horror)|sum(War)|sum(Thriller)|sum(History)|sum(Comedy)|sum(Foreign)|sum(Music)|
+-------------+------------+------------+-----------+----------+--------------+----------------+-------------+------------+--------------+----------+------------+-----------+--------------------+-----------+--------+-------------+------------+-----------+------------+----------+
|Kibuts Gesher|         174|         171|        170|       159|           153|            2591|          130|         150|           146|       144|         14

## Insight 2

As can be seen from the first table (locations_genres):

There is an almost uniform distribution of the genres of movies, but point out that:
* **Haifa** - There is a high degree of interest in *Drama* movies in Haifa compared to other genres
* **Tel Aviv** - There is a high degree of interest in *Action* movies in Tel Aviv compared to other genres
* **Kibuts Gesher** - There is a high degree of interest in *Family* and *Documentary* movies in Kibuts Gesher compared to other genres

We can see from the second table (genres_avg) that:

* Drama and Action are the most popular genres.
* Documentary and Family are the next most popular.




---
## Release date per city from queries

Count number of requests from users-location to realease-date

* from queries, select release_date and id
* join release_date with users
* replace the id column with the user location
* get the locations - release_date matrix grouped by user location

### Code

In [None]:
# select colmns from queries
date_df = queries.select('date', 'id')

def get_distinct_list(list_):
  res_list = []
  for element in list_:
    if element not in res_list:
      res_list.append(element)
  return sorted(res_list)

# get dates list
tmp = date_df.rdd.map(lambda x: x[0]).collect()
dates = get_distinct_list(tmp)

# splite dates into colmns
for date in dates:
  date_filter = f.udf(lambda x: 1 if date == int(x) else 0)
  date_df = date_df.withColumn(str(date), date_filter("date"))
date_df = date_df.drop('date')

In [None]:
# full outer join genres and users
users_dates = date_df.join(users, date_df[0] == users[0], 'fullouter')
users_dates = users_dates.drop('id')

# reorder columns
cols = list(users_dates.columns)
cols = [cols[-1]] + cols[:-2]
users_dates = users_dates.select(cols)

# cast dates to int format
for date in dates:
  users_dates = users_dates.withColumn(str(date), f.col(str(date)).cast('int'))

# group by user location
locations_dates = users_dates.groupBy(f.col('user_location')).sum()

In [None]:
# get the average of dates look up
dates_avg = users_dates.groupBy(f.col('user_location')).avg()
dates_avg = dates_avg.groupBy().avg()

### Output

In [None]:
# show results
locations_dates.show()
dates_avg.show(5)

+-------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|user_location|sum(1990)|sum(1991)|sum(1992)|sum(1993)|sum(1994)|sum(1995)|sum(1996)|sum(1997)|sum(1998)|sum(1999)|sum(2000)|sum(2001)|sum(2002)|sum(2003)|sum(2004)|sum(2005)|sum(2006)|sum(2007)|sum(2008)|sum(2009)|sum(2010)|sum(2011)|sum(2012)|sum(2013)|sum(2014)|sum(2015)|sum(2016)|sum(2017)|sum(2018)|
+-------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|Kibuts Gesher|      201|      229|      270|      241|      220|      212|      2

## Insight 3

As can be seen from the first table (locations_dates):

* **Haifa** and **Tel Aviv** users never serach for movies released before 2010 
* **Kibuts Gesher** users are allways looking for movies after 2012
<br>

We can see from the second table (dates_avg) that:

* From 2012 - popularity has heavily increased



---
## Tickets per city
Find out how many tickets were sold in each city

* Join tickets with users
* select city, n_tickets, user_id
* replace id column with user location
* group by user location and get a location matrix

### Code

In [None]:
tickets_number = tickets.select('user_id', 'city', 'n_tickets')
tickets_number = tickets_number.join(users, tickets_number[0] == users[0], 'fullouter').select('user_location', 'city', 'n_tickets')

# splite cities into colmns
for city in ['Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat']:
  city_filter = f.udf(lambda x, y: x if city == y else 0)
  tickets_number = tickets_number.withColumn(city, city_filter('n_tickets', 'city'))

# cast numbers to int format
feathers = ['Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat']
for feather in feathers:
  tickets_number = tickets_number.withColumn(feather, f.col(feather).cast('int'))

# group by user_location and sum all cities
tickets_number = tickets_number.drop('city', 'n_tickets').groupBy('user_location').sum('Tel Aviv', 'Jerusalem', 'Haifa', 'Tiberias', 'Eilat')

### Output

In [None]:
tickets_number.show(10)

+-------------+-------------+--------------+----------+-------------+----------+
|user_location|sum(Tel Aviv)|sum(Jerusalem)|sum(Haifa)|sum(Tiberias)|sum(Eilat)|
+-------------+-------------+--------------+----------+-------------+----------+
|Kibuts Gesher|         5612|          5357|      5658|         5404|      5884|
|        Haifa|        45384|         44004|     45815|        45223|     47949|
|    Jerusalem|        63329|         60898|     63469|        62225|     66005|
|     Tel Aviv|        27819|         27146|     28559|        27617|     29062|
|        Eilat|        27205|         26927|     27725|        27174|     29012|
+-------------+-------------+--------------+----------+-------------+----------+



## Insight 4

* We can see that tickets have been purchased nearly equally in all cities
* Noticed that users from all cities by tickets to all other cities
* Noticed that Eilat has a higher number of ticket purchases than the other cities
* Noticed that 'Kibuts Gesher' is the least popular


---
## Cities search for actors and director

Check which cities search for actors and director

* select 'id', 'actors', 'director' from queires
* Join with users
* replace id column with user location
* group by user location and get a binary average of search for actors and director 



### Code

In [None]:
cities_crew = queries.select('id', 'actors', 'director')

# cities join users
cities_crew = cities_crew.join(users, cities_crew[0] == users[0], 'fullouter').select('user_location', 'actors', 'director')

# udf
actors_filter = f.udf(lambda x: 0 if len(x[0]) == 0 else 1)
director_filter = f.udf(lambda x: 0 if len(x[0]) == 0 else 1)

# add column with 1 if querie search for actors/director else 0
cities_crew = cities_crew.withColumn('is_actors', actors_filter("actors"))
cities_crew = cities_crew.withColumn('is_director', actors_filter("director"))

# cast numbers to int
cities_crew = cities_crew.withColumn('is_actors', f.col('is_actors').cast('int'))
cities_crew = cities_crew.withColumn('is_director', f.col('is_director').cast('int'))

cities_crew = cities_crew.select('user_location', 'is_actors', 'is_director')
cities_crew = cities_crew.groupBy('user_location').avg('is_actors', 'is_director')

### Output

In [None]:
cities_crew.show()

+-------------+------------------+--------------------+
|user_location|    avg(is_actors)|    avg(is_director)|
+-------------+------------------+--------------------+
|Kibuts Gesher|0.4975416036308623|0.028555219364599094|
|        Haifa|0.4978754849436542|   0.919083687419176|
|    Jerusalem|0.9508308230492664| 0.03128947624137596|
|     Tel Aviv|0.5045694271105922|0.030717822778694795|
|        Eilat|               0.0| 0.03066635923449389|
+-------------+------------------+--------------------+



## Insight 5

* We can see that in Haifa about 90% of users search for directors
* In all other cities only about 3% of users search for directors
* Noticed that Eilat users does not search at all for actors
* Noticed that about 50% users in 'Kibuts Gesher', 'Haifa', 'Tel Aviv' search for actors
* Noticed that about 95% od users in 'Jerusalem' search for actors

---
# **Conclusions**

# **Main conclusions**


**Horizontal**  

1. **Eilat** - we can see that all Eilat users only searching for movies in Eilat
1.**Tel Aviv** - 14.8% of the queries from Tel-Aviv search for movies in Jerusalem
1. **Jerusalem** - 60.2% of the queries from Jerusalem search for movies in Tel-Aviv
1. **Haifa** - 6% of the queries from Haifa search for movies in Tel-Aviv, and 20% for movies in Tiberias
1. **Kibuts Gesher** - 20% of the queries from Kibuts Gesher search for movies in Haifa
1. **Haifa** - There is a high degree of interest in *Drama* movies in Haifa compared to other genres
1. **Tel Aviv** - There is a high degree of interest in *Action* movies in Tel Aviv compared to other genres
1. **Kibuts Gesher** - There is a high degree of interest in *Family* and *Documentary* movies in Kibuts Gesher compared to other genres
1. **Haifa** and **Tel Aviv** users never search for movies released before 2010 

**Vertical**

1. **Eilat** - users does not search at all for actors
1. in all cities except Haifa only about 3% of users search for director

---
# **Part D - Deploy via Spark (15 points)**


In [None]:
import os

In [None]:
# create directory for each site
sites = ["Haifa", "Tel_Aviv", "Jerusalem", "Eilat", "Tiberias"]
for site in sites:
  try: 
    os.makedirs(site) 
  except OSError as error: 
    print(error) 

[Errno 17] File exists: 'Tel_Aviv'
[Errno 17] File exists: 'Jerusalem'
[Errno 17] File exists: 'Eilat'
[Errno 17] File exists: 'Tiberias'


In [None]:
# select from movies only columns for the fragmentation
df = movies
# add column year from release_date
df = df.withColumn('year', f.year('release_date'))

# join df with credits
credits_ = credits.withColumnRenamed('id', 'id2')
df = df.join(credits_, df[0] == credits_[0]).drop('id2')

In [None]:
# Haifa
haifa_df = df
# filter only movies available in Tiberias or Haifa
haifa_df = haifa_df.filter(f.array_contains(f.col('cities'), 'Haifa')\
                           | f.array_contains(f.col('cities'), 'Tiberias'))
# filter only movies released after 2012
haifa_df = haifa_df.filter(f.col("year") >= 2012)


# get credits df only in haifa movies
# haifa_credits = credits.join(haifa_df.select('id'), haifa_df[0] == credits[0], 'left_semi')

# split only movies of gnere Darama into new df
haifa_genre = haifa_df.filter(f.array_contains(f.col('genres'), 'Drama'))
haifa_genre = haifa_genre
haifa_rest = haifa_df.subtract(haifa_genre)

In [None]:
# Tel-Aviv
telaviv_df = df.drop('director')
# filter only movies available in Tel-Aviv or Jerusalem
telaviv_df = telaviv_df.filter(f.array_contains(f.col('cities'), 'Tel Aviv')\
                           | f.array_contains(f.col('cities'), 'Jerusalem'))
# filter only movies released after 2012
telaviv_df = telaviv_df.filter(f.col("year") >= 2012)


# get credits df only in Tel-Aviv movies
telaviv_credits = credits.join(telaviv_df.select('id'), telaviv_df[0] == credits[0], 'left_semi')
telaviv_director = telaviv_credits.select('id', 'director')

# split only movies of gnere Darama and Action into new df
telaviv_genre = telaviv_df.filter(f.array_contains(f.col('genres'), 'Action')\
                               | f.array_contains(f.col('genres'), 'Drama'))
telaviv_rest = telaviv_df.subtract(telaviv_genre)

In [None]:
# Jerusalem
jerusalem_df = df.drop('director')
# filter only movies available in Tel-Aviv or Jerusalem
jerusalem_df = jerusalem_df.filter(f.array_contains(f.col('cities'), 'Tel Aviv')\
                           | f.array_contains(f.col('cities'), 'Jerusalem'))


# get credits df only in Jerusalem movies
jerusalem_credits = credits.join(jerusalem_df.select('id'), jerusalem_df[0] == credits[0], 'left_semi')
jerusalem_director = jerusalem_credits.select('id', 'director')

In [None]:
# Eilat
eilat_df = df.drop('director', 'actors')
# filter only movies available in Tel-Aviv or Jerusalem
eilat_df = eilat_df.filter(f.array_contains(f.col('cities'), 'Eilat'))


# get credits df only in Eilat movies
eilat_credits = credits.join(eilat_df.select('id'), eilat_df[0] == credits[0], 'left_semi')

In [None]:
# Tiberias
tiberias_df = df.drop('director')
# filter only movies available in Tel-Aviv or Jerusalem
tiberias_df = tiberias_df.filter(f.array_contains(f.col('cities'), 'Haifa')\
                           | f.array_contains(f.col('cities'), 'Tiberias'))

# get credits df only in Tiberias movies
tiberias_credits = credits.join(tiberias_df.select('id'), tiberias_df[0] == credits[0], 'left_semi')
tiberias_director = tiberias_credits.select('id', 'director')

# split only movies of gnere Darama and Action into new df
tiberias_genre = tiberias_df.filter(f.array_contains(f.col('genres'), 'Documentary')\
                               | f.array_contains(f.col('genres'), 'Family'))
tiberias_rest = tiberias_df.subtract(tiberias_genre)

### cast to string and export to CSV

In [None]:
# cast table to string and then to csv
haifa_genreCSV = haifa_genre.withColumn("cities", f.col("cities").cast("string"))
haifa_genreCSV = haifa_genreCSV.withColumn("genres", f.col("genres").cast("string"))
haifa_genreCSV = haifa_genreCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
haifa_genreCSV = haifa_genreCSV.withColumn("countries", f.col("countries").cast("string"))
haifa_genreCSV = haifa_genreCSV.withColumn("languages", f.col("languages").cast("string"))
haifa_genreCSV = haifa_genreCSV.withColumn("actors", f.col("actors").cast("string"))
haifa_genreCSV = haifa_genreCSV.withColumn("director", f.col("director").cast("string"))

haifa_genreCSV.write.csv('Haifa/haifa_genre.csv')

# cast table to string and then to csv
haifa_restCSV = haifa_rest.withColumn("cities", f.col("cities").cast("string"))
haifa_restCSV = haifa_restCSV.withColumn("genres", f.col("genres").cast("string"))
haifa_restCSV = haifa_restCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
haifa_restCSV = haifa_restCSV.withColumn("countries", f.col("countries").cast("string"))
haifa_restCSV = haifa_restCSV.withColumn("languages", f.col("languages").cast("string"))
haifa_restCSV = haifa_restCSV.withColumn("actors", f.col("actors").cast("string"))
haifa_restCSV = haifa_restCSV.withColumn("director", f.col("director").cast("string"))

haifa_restCSV.write.csv('Haifa/haifa_rest.csv')

In [None]:
# cast table to string and then to csv
telaviv_genreCSV = telaviv_genre.withColumn("cities", f.col("cities").cast("string"))
telaviv_genreCSV = telaviv_genreCSV.withColumn("genres", f.col("genres").cast("string"))
telaviv_genreCSV = telaviv_genreCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
telaviv_genreCSV = telaviv_genreCSV.withColumn("countries", f.col("countries").cast("string"))
telaviv_genreCSV = telaviv_genreCSV.withColumn("languages", f.col("languages").cast("string"))

telaviv_genreCSV.write.csv('Tel_Aviv/telaviv_genre.csv')

# cast table to string and then to csv
telaviv_restCSV = telaviv_rest.withColumn("cities", f.col("cities").cast("string"))
telaviv_restCSV = telaviv_restCSV.withColumn("genres", f.col("genres").cast("string"))
telaviv_restCSV = telaviv_restCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
telaviv_restCSV = telaviv_restCSV.withColumn("countries", f.col("countries").cast("string"))
telaviv_restCSV = telaviv_restCSV.withColumn("languages", f.col("languages").cast("string"))

telaviv_restCSV.write.csv('Tel_Aviv/telaviv_rest.csv')

# cast table to string and then to csv
telaviv_creditsCSV = telaviv_credits.withColumn("actors", f.col("actors").cast("string"))
telaviv_creditsCSV = telaviv_creditsCSV.withColumn("director", f.col("director").cast("string"))

telaviv_creditsCSV.write.csv('Tel_Aviv/telaviv_credits.csv')

In [None]:
# cast table to string and then to csv
jerusalem_dfCSV = jerusalem_df.withColumn("cities", f.col("cities").cast("string"))
jerusalem_dfCSV = jerusalem_dfCSV.withColumn("genres", f.col("genres").cast("string"))
jerusalem_dfCSV = jerusalem_dfCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
jerusalem_dfCSV = jerusalem_dfCSV.withColumn("countries", f.col("countries").cast("string"))
jerusalem_dfCSV = jerusalem_dfCSV.withColumn("languages", f.col("languages").cast("string"))

jerusalem_dfCSV.write.csv('Jerusalem/jerusalem_df.csv')

# cast table to string and then to csv
jerusalem_creditsCSV = jerusalem_credits.withColumn("actors", f.col("actors").cast("string"))
jerusalem_creditsCSV = jerusalem_creditsCSV.withColumn("director", f.col("director").cast("string"))

jerusalem_creditsCSV.write.csv('Jerusalem/jerusalem_credits.csv')

In [None]:
# cast table to string and then to csv
eilat_dfCSV = eilat_df.withColumn("cities", f.col("cities").cast("string"))
eilat_dfCSV = eilat_dfCSV.withColumn("genres", f.col("genres").cast("string"))
eilat_dfCSV = eilat_dfCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
eilat_dfCSV = eilat_dfCSV.withColumn("countries", f.col("countries").cast("string"))
eilat_dfCSV = eilat_dfCSV.withColumn("languages", f.col("languages").cast("string"))

eilat_dfCSV.write.csv('Eilat/eilat_df.csv')

# cast table to string and then to csv
eilat_creditsCSV = eilat_credits.withColumn("actors", f.col("actors").cast("string"))
eilat_creditsCSV = eilat_creditsCSV.withColumn("director", f.col("director").cast("string"))

eilat_creditsCSV.write.csv('Eilat/eilat_credits.csv')

In [None]:
# cast table to string and then to csv
tiberias_genreCSV = tiberias_genre.withColumn("cities", f.col("cities").cast("string"))
tiberias_genreCSV = tiberias_genreCSV.withColumn("genres", f.col("genres").cast("string"))
tiberias_genreCSV = tiberias_genreCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
tiberias_genreCSV = tiberias_genreCSV.withColumn("countries", f.col("countries").cast("string"))
tiberias_genreCSV = tiberias_genreCSV.withColumn("languages", f.col("languages").cast("string"))

tiberias_genreCSV.write.csv('Tiberias/tiberias_genre.csv')

# cast table to string and then to csv
tiberias_restCSV = tiberias_rest.withColumn("cities", f.col("cities").cast("string"))
tiberias_restCSV = tiberias_restCSV.withColumn("genres", f.col("genres").cast("string"))
tiberias_restCSV = tiberias_restCSV.withColumn("prod_comp", f.col("prod_comp").cast("string"))
tiberias_restCSV = tiberias_restCSV.withColumn("countries", f.col("countries").cast("string"))
tiberias_restCSV = tiberias_restCSV.withColumn("languages", f.col("languages").cast("string"))

tiberias_restCSV.write.csv('Tiberias/tiberias_rest.csv')

# cast table to string and then to csv
tiberias_creditsCSV = tiberias_credits.withColumn("actors", f.col("actors").cast("string"))
tiberias_creditsCSV = tiberias_creditsCSV.withColumn("director", f.col("director").cast("string"))

tiberias_creditsCSV.write.csv('Tiberias/tiberias_credits.csv')

### Find lost data

In [None]:
# count how many recotds has been lost
on_site = tiberias_df.select('id')\
        .union(eilat_df.select('id'))\
        .union(jerusalem_df.select('id'))\
        .union(telaviv_df.select('id'))\
        .union(haifa_df.select('id')).distinct()

all_id = movies.select('id').distinct()

diff = on_site.subtract(all_id)
print(f'number of lost records: {diff.count()}')

number of lost records: 466


In [None]:
print(f"Tiberias count: {tiberias_df.select('id').distinct().count()}")
print(f"Eilat count: {eilat_df.select('id').distinct().count()}")
print(f"Jerusalem count: {jerusalem_df.select('id').distinct().count()}")
print(f"Tel-Aviv count: {telaviv_df.select('id').distinct().count()}")
print(f"Haifa count: {haifa_df.select('id').distinct().count()}")

Tiberias count: 43160
Eilat count: 33914
Jerusalem count: 43226
Tel-Aviv count: 9158
Haifa count: 9139
