In [0]:
#connection to databricks from adls
spark.conf.set("fs.azure.account.key.<storage_account>.dfs.core.windows.net",dbutils.secrets.get(scope="<secretscope>", key="<accesskey>"))

In [0]:
#reading the Json file from path
json_df = spark.read.json(f"abfss://<container>@<storage-account>.dfs.core.windows.net/<file-path>")

In [0]:
from pyspark.sql.functions import explode, col

# Flatten the DataFrame by exploding array columns
flattened_df = json_df.selectExpr("*", "explode(results) AS exploded_results")

# Select only the necessary columns for CSV conversion
csv_df = flattened_df.select(
    "exploded_results.adult",
    "exploded_results.backdrop_path",
    "exploded_results.id",
    "exploded_results.media_type",
    "exploded_results.name",
    "exploded_results.original_language",
    "exploded_results.original_name",
    "exploded_results.first_air_date",
    "exploded_results.original_title",
    "exploded_results.overview",
    "exploded_results.popularity",
    "exploded_results.poster_path",
    "exploded_results.release_date",
    "exploded_results.title",
    "exploded_results.video",
    "exploded_results.vote_average",
    "exploded_results.vote_count",
    col("exploded_results.origin_country").cast("string").alias("origin_country_str")
)

# Convert array column to a string representation
csv_df = csv_df.withColumn("origin_country_str", col("origin_country_str").cast("string"))
csv_df = csv_df.withColumn("origin_country", col("origin_country_str"))

# Drop the temporary string column
csv_df = csv_df.drop("origin_country_str")

In [0]:
from pyspark.sql.functions import col, coalesce

# Combine the two columns into a new column and ignore null values
csv_df = csv_df.withColumn('Name', coalesce(col('title'), col('name')))
csv_df = csv_df.withColumn('Release_Dates', coalesce(col('first_air_date'), col('release_date')))

# Drop the original two columns if needed
csv_df = csv_df.drop('first_air_date', 'release_date','original_name','origin_country','original_title','title','video','backdrop_path')

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, trim

# Function to add the prefix to each poster_path value
def add_prefix(path):
    return 'https://image.tmdb.org/t/p/w500/' + path.strip()

# Use Spark built-in functions to add the prefix to the 'poster_path' column
csv_df = csv_df.withColumn('poster_path', concat(lit('https://image.tmdb.org/t/p/w500/'), trim(csv_df['poster_path'])))

In [0]:
# Define the desired order of columns (replace with your actual column names)

csv_df = csv_df.select(
    col("id").alias("ID"),
    col("Name").alias("Names"),
    col("Release_Dates").alias("Release_Dates"),
    col("media_type").alias("Media_Type"),
    col("adult").alias("Adult"),
    col("original_language").alias("Original_Language"),
    col("overview").alias("Overview"),
    col("popularity").alias("Popularity"),
    col("vote_average").alias("Vote_Average"),
    col("vote_count").alias("Vote_Count"),
    col("poster_path").alias("Poster_Path")
)

In [0]:
#setting variables for sql server 
jdbcHostname = "<sqlsrv-name>.database.windows.net"
jdbcPort = "1433"
jdbcDatabase = "<db-name>"
properties = {
 "user" : "<username>",
 "password" : "<password>" }

In [0]:
 #jdbc url
 url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)

In [0]:
#connecting to sql server and define the table name.
from pyspark.sql import *
import pandas as pd
myfinaldf = DataFrameWriter(csv_df)
myfinaldf.jdbc(url=url, table= "Movies", mode ="overwrite", properties = properties)

