# Porsche Assginment

#### imports

In [1]:
import os
import json
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *
import pyspark.sql.functions as f
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import split, explode
from pyspark.sql.functions import collect_list

#### starting spark session

In [2]:
spark = SparkSession.builder.appName('Movies').getOrCreate()

#### data location

Current solution expect that data is exported in folder 'data'.

In [3]:
data = 'data//'

#### load all data

In [4]:
def get_all_files(path_m, extension="csv"):
    file_names = []
    for root, dirs, files in os.walk(path_m):
        for name in files:
            name_ext = name.split(".")[-1]
            if name_ext == extension:
                file_names.append(os.path.join(root, name))
            
    return file_names

all_files = get_all_files(data)

 #### loading data to dataframe
Since data is not well formated, given that it has bad-json values with single quotes insead of double qoutes, and somewhere it has double quotes around these bad-jsons, and even somwhere for values inside bad-json it has a pair of double qoutes as a qoutation... and since as of yet, spark csv cannot be separated by regex - we do not read this file as csv but as a text file.

In [5]:
def read_all_files_into_df(all_files):
    df = spark.read.text(all_files)
    header=df.first()[0]
    schema=header.split(",")

    df = df.filter(~col("value").contains(header))
    
    sep = r',(?! )'
    new_sep = ";;;"
    df = df.withColumn('separated_values', regexp_replace(('value'), sep, new_sep))

    for i in range(len(schema)):
        df = df.withColumn(schema[i], split(col("separated_values"), new_sep).getItem(i))
    df = df[schema]
    
    return df

In [6]:
def prepare_df_for_json(dft, col='cast'):
    dft2 = dft.withColumn(col, regexp_replace('cast', '""', ""))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "\"\[\{", '\[\{'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "\}\]\"", '\}\]'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', '"', ""))
    
    dft2 = dft.withColumn(col, regexp_replace('cast', "\[\{'", '\[\{"'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "'\}\]", '"\}\]'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "\{'", '\{"'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "\': \'", '": "'))
    
    dft2 = dft2.withColumn(col, regexp_replace('cast', "\', \'", '", "'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', ", \'", ', "'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "\': ", '": '))
    dft2 = dft2.withColumn(col, regexp_replace('cast', "'\}", '"\}'))

    dft2 = dft2.withColumn(col, regexp_replace('cast', '"\[\{', '\[\{'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', '\}\]"', '\}\]'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', '""', '`'))
    dft2 = dft2.withColumn(col, regexp_replace('cast', '`', '"'))
    
    dft2 = dft2.withColumn(col, regexp_replace('cast', ': None', ': ""'))
    
    return dft2

#### UDFs

Given more time, reading these jsons, can be improved in a lots of ways.

In [7]:
json_schema = ArrayType(StructType([
    StructField("id",StringType(),True), 
    StructField("name",StringType(),True)
  ]))

json_schema_ids = ArrayType(
   StringType(),True
  )

def parse_json(array_str):
    try:
        json_obj = json.loads(array_str)
        for item in json_obj:
            try:
                yield (item["id"].strip(), item["name"].strip())
            except:
                yield ("Null", "Null")
    except:
        yield ("Null", "Null")
        

def fake_parse_jsons(array_str):
    try:
        k = array_str.split("}, {")
        for i in k:
            t = i.split(", ")
            for p in t:
                if '"name"' in p:
                    name = p.split(": ")[1]#[1,-1]
                if '"id"' in p:
                    ids = p.split(": ")[1]
            yield (ids.strip(), name.strip())
    except:
        yield None
        #yield ("Null", "Null")
        
udf_parse_json = udf(lambda lst: parse_json(lst), json_schema)
udf_parse_fake_json = udf(lambda lst: fake_parse_jsons(lst), json_schema) 

### Load data to dataframe, and preparing for task1 and task2

In [8]:
df = read_all_files_into_df(all_files)

df = df.na.drop()

df = prepare_df_for_json(df, 'cast')
df = df.withColumn("cast_id_name", udf_parse_fake_json(df.cast))

df = prepare_df_for_json(df, 'crew')
df = df.withColumn("crew_id_name", udf_parse_fake_json(df.crew))

In [9]:
#Task includes cast and crew, it can be added.

df = df.withColumn("cast_id_name", array_distinct(concat(col("cast_id_name"), col("crew_id_name"))))

## First task can be resolved as this dataframe. But we can also give a very large dict.
Task 1 is to extract data about people from cast and crew on movie set. Data is in bad-json inside csvs.

In [10]:
df_cast_full = df[["cast_id_name"]]

dft2_exploded = df_cast_full.withColumn('cast_id_name_single', explode('cast_id_name'))

df_id_name = dft2_exploded.withColumn('id', col('cast_id_name_single')["id"]).withColumn("name", col('cast_id_name_single')["name"])[['id',"name"]]


In [11]:
df_id_name.show(10)

+------+-------------------+
|    id|               name|
+------+-------------------+
| 67228| "Kathryn Beaumont"|
| 67290|     "Verna Felton"|
|  5833|          "Ed Wynn"|
| 29283|    "Richard Haydn"|
| 34759|"Sterling Holloway"|
|142527|    "Jerry Colonna"|
| 22602|  "J. Pat O'Malley"|
| 67230|    "Bill Thompson"|
| 93897|    "Heather Angel"|
|132709|    "Joseph Kearns"|
+------+-------------------+
only showing top 10 rows



In [12]:
#df_id_name = df_id_name.dropDuplicates()
#df_id_name.show(10)

In [13]:
#Currently not working, maybe due to too much data, but true python dictionary for single node would be get in this way:


# id_dict = df_id_name.toPandas().set_index('id').T.to_dict('list')

## Second task is to get index with movie ids and actor ids

In [14]:
df_cast = df[["id", "cast_id_name"]]
df_cast = df_cast.withColumn("actor_ids", df_cast.cast_id_name.id)
#df_cast.show(10)

In [15]:
dft2_exploded = df_cast.withColumn('actor_id', explode('actor_ids'))
#dft2_exploded.show(10)

In [16]:
df_movies = dft2_exploded[["id", "actor_id"]]
#df_movies.show(10)

In [17]:
df_movies = df_movies.na.drop()

In [18]:
df_movies = df_movies.dropDuplicates()

In [19]:
#grouped_df = df_movies.groupby('actor_id').agg(collect_set('id').alias("ids"))
grouped_df = df_movies.groupby('actor_id').agg(collect_list('id').alias("ids"))
grouped_df.show(10)

+--------+--------------------+
|actor_id|                 ids|
+--------+--------------------+
|     100|[834, 18165, 4911...|
|   10000|[855, 54845, 319,...|
| 1000007|[96771, 162928, 1...|
| 1000061|             [36799]|
| 1000083|[17058, 596, 3769...|
| 1000145|            [136336]|
| 1000152|            [169726]|
| 1000195|[104221, 108213, ...|
| 1000197|             [17185]|
|  100022|            [280617]|
+--------+--------------------+
only showing top 10 rows



# Conclusion

Things that should be fixed:

    1. loading json data
    
    2. add option automatily download data given the link to data online
    
    3. task1: enable collecting data and export dictionary (not as dataframe, but as python dict)
    
    
    

Solved:

    4. task2: edit data, drop null data
    
    5. add crew data (currently there is some bug when concatenating cast data and crew data) 

In [20]:
print('PySpark Version :'+spark.version)

PySpark Version :3.3.0
