# Extract Data from TMDB API into Spark DF
Load necessary libraries. We use requests to connect through API and pyspark to extract data into a dataframe

In [7]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    BooleanType,
    ArrayType,
    DoubleType,
    IntegerType,
)
import pandas as pd
import numpy as np

pd.set_option("mode.copy_on_write", True)

In [None]:
# Load data from API
url = "https://api.themoviedb.org/3/discover/movie?include_adult=false&include_video=false&language=en-US&page=1&sort_by=popularity.desc"
headers = {
    "accept": "application/json",
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIzMDBlYTAyNTNkMWQyN2U4MjdjMjBiOTFmYzYwMmVhYyIsIm5iZiI6MTcz"
    + "MjU1NjY1MS43MjAxMTM1LCJzdWIiOiI2NzQ0YjRhN2NmZDI0YzNhN2FhYjlhMjkiLCJzY29wZXMiOlsiYXBpX3JlYWQiXSwidm"
    + "Vyc2lvbiI6MX0.C7hMVCyhI98CrjSrYcWNXjS7xPe5JvpiUE7jyDDR6BY",
}
response = requests.get(url, headers=headers)
results = response.json()["results"]
results[:3]

[{'adult': False,
  'backdrop_path': '/3V4kLQg0kSqPLctI5ziYWabAZYF.jpg',
  'genre_ids': [878, 28, 12],
  'id': 912649,
  'original_language': 'en',
  'original_title': 'Venom: The Last Dance',
  'overview': "Eddie and Venom are on the run. Hunted by both of their worlds and with the net closing in, the duo are forced into a devastating decision that will bring the curtains down on Venom and Eddie's last dance.",
  'popularity': 3752.76,
  'poster_path': '/aosm8NMQ3UyoBVpSxyimorCQykC.jpg',
  'release_date': '2024-10-22',
  'title': 'Venom: The Last Dance',
  'video': False,
  'vote_average': 6.491,
  'vote_count': 843},
 {'adult': False,
  'backdrop_path': '/iR79ciqhtaZ9BE7YFA1HpCHQgX4.jpg',
  'genre_ids': [27, 9648],
  'id': 1100782,
  'original_language': 'en',
  'original_title': 'Smile 2',
  'overview': 'About to embark on a new world tour, global pop sensation Skye Riley begins experiencing increasingly terrifying and inexplicable events. Overwhelmed by the escalating horrors and t

In [None]:
# create spark df
spark = SparkSession.builder.appName("CreateTableDatabricks").getOrCreate()
rdd = spark.sparkContext.parallelize(results)

schema = StructType(
    [
        StructField("adult", BooleanType(), True),
        StructField("backdrop_path", StringType(), True),
        StructField("genre_ids", ArrayType(IntegerType()), True),
        StructField("id", IntegerType(), True),
        StructField("original_language", StringType(), True),
        StructField("original_title", StringType(), True),
        StructField("overview", StringType(), True),
        StructField("popularity", DoubleType(), True),
        StructField("poster_path", StringType(), True),
        StructField("release_date", StringType(), True),
        StructField("title", StringType(), True),
        StructField("vote_average", DoubleType(), True),
        StructField("vote_count", IntegerType(), True),
    ]
)

spark_df = spark.createDataFrame(rdd, schema)

In [None]:
spark_df.select("original_title", "popularity", "vote_average", "vote_count").show(10)

+--------------------+----------+------------+----------+
|      original_title|popularity|vote_average|vote_count|
+--------------------+----------+------------+----------+
|Venom: The Last D...|   3752.76|       6.491|       843|
|             Smile 2|  2711.905|         6.9|       619|
|      The Wild Robot|  2542.661|         8.5|      3084|
|        Gladiator II|  2139.537|         6.8|       668|
|         Terrifier 3|  2051.426|         6.9|      1084|
|Apocalipsis Z: el...|  1814.827|       6.745|       545|
|              Levels|  1634.969|       5.875|        20|
|              Wicked|  1485.249|         7.9|       164|
|Deadpool & Wolverine|  1423.522|       7.689|      5581|
|       The Substance|  1348.076|         7.3|      2181|
+--------------------+----------+------------+----------+
only showing top 10 rows



In [9]:
df = pd.DataFrame(results)
df.to_csv("movies.csv")

# TODO: Load data into Databricks

I can think of two ways to do this. (1) We load the values comma-separated as payloads into Databricks (2) We load the spark dataframe into Databricks. This should be a good start for either method. If we choose (1) pandas might be more easier than pyspark.

In [None]:
## Template to Load data into Databricks
headers = {
    "accept": "application/json",
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIzMDBlYTAyNTNkMWQyN2U4MjdjMjBiOTFmYzYwMmVhYyIsIm5iZiI6MTcz" +
                    "MjU1NjY1MS43MjAxMTM1LCJzdWIiOiI2NzQ0YjRhN2NmZDI0YzNhN2FhYjlhMjkiLCJzY29wZXMiOlsiYXBpX3JlYWQiXSwidm" + 
                    "Vyc2lvbiI6MX0.C7hMVCyhI98CrjSrYcWNXjS7xPe5JvpiUE7jyDDR6BY"
}

num_pgs = response.json()['total_pages']
for i in range(num_pgs):
    url = "https://api.themoviedb.org/3/discover/movie?include_adult=false&include_video=false&language=en-US&" +
          f"page={i+1}&sort_by=popularity.desc"
    response = requests.get(url, headers=headers)
    results = response.json()['results']
    # create spark df
    spark = SparkSession.builder.appName("CreateTableDatabricks").getOrCreate()
    rdd = spark.sparkContext.parallelize(results)

    schema = StructType([
        StructField("adult", BooleanType(), True),
        StructField("backdrop_path", StringType(), True),
        StructField("genre_ids", ArrayType(IntegerType()), True),
        StructField("id", IntegerType(), True),
        StructField("original_language", StringType(), True),
        StructField("original_title", StringType(), True),
        StructField("overview", StringType(), True),
        StructField("popularity", DoubleType(), True),
        StructField("poster_path", StringType(), True),
        StructField("release_date", StringType(), True),
        StructField("title", StringType(), True),
        StructField("vote_average", DoubleType(), True),
        StructField("vote_count", IntegerType(), True)
        ])

    spark_df = spark.createDataFrame(rdd, schema)
    # TODO: send each page of data to databricks incrementally
