In [None]:
# mount blob to databricks using account key. For PROD it is recommended to use a service principal (IAM)
dbutils.fs.mount(
  source = "wasbs://<container-name>@<account-name>.blob.core.windows.net",
  mount_point = "/mnt/",
  extra_configs = {"fs.azure.account.key.account-name.blob.core.windows.net":"access-key"})


In [None]:
ls dbfs/mnt/raw/

# Collect the raw catalog data from all three source systems

## From Fourth Coffee

In [None]:
fc_movies = spark.read.csv("/mnt/fourthcoffee/raw/Movies.csv", header='true', inferSchema='true')
fc_actors = spark.read.csv("/mnt/fourthcoffee/raw/Actors.csv", header='true', inferSchema='true')
fc_movie_actors = spark.read.csv("/mnt/fourthcoffee/raw/MovieActors.csv", header='true', inferSchema='true')
fc_mappings = spark.read.csv("/mnt/fourthcoffee/raw/OnlineMovieMappings.csv", header='true', inferSchema='true')

# Use the MovieActors data to join the actor information. Left join to preserve movies where the actors are not found
# Use the OnlineMovieMappings data to bring in the ids of movies which are also found in the Southridge Video catalog,
# again using the left join to preserve movies which are not matched.

fc_catalog = fc_movies \
  .join(fc_movie_actors, on='MovieId', how='left') \
  .join(fc_actors, on='ActorId', how='left') \
  .join(fc_mappings, on='MovieId', how='left')

## From VanArsdel

In [None]:
va_movies = spark.read.csv("/mnt/vanarsdel/raw/dbo.Movies.csv", header='true')
va_actors = spark.read.csv("/mnt/vanarsdel/raw/dbo.Actors.csv", header='true')
va_movie_actors = spark.read.csv("/mnt/vanarsdel/raw/dbo.MovieActors.csv", header='true')
va_mappings = spark.read.csv("/mnt/vanarsdel/raw/dbo.OnlineMovieMappings.csv", header='true')

# Use the MovieActors data to join the actor information. Left join to preserve movies where the actors are not found
# Use the OnlineMovieMappings data to bring in the ids of movies which are also found in the Southridge Video catalog,
# again using the left join to preserve movies which are not matched.

va_catalog = va_movies \
  .join(va_movie_actors, on='MovieId', how='left') \
  .join(va_actors, on='ActorId', how='left') \
  .join(va_mappings, on='MovieId', how='left')

## From Southridge

In [None]:
!head dbfs/mnt/southridge/raw/catalog/movies.json

In [None]:
# After reading the JSON, explode the actors array to create multiple rows per movie, each having a single actor name
# Drop the original actors array, as it is then no longer needed
# Also drop the Cosmos DB metadata, as it will not be valuable here
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType

sr_catalog = spark.read.json("/mnt/southridge/raw/catalog/movies.json") \
  .withColumn('actor', F.explode('actors')) \
  .drop('actors', '_attachments', '_etag', '_rid', '_self', '_ts')

# Observe inconsistencies in the data types and formats

Let's take a look at the following discrepencies which could cause fatal errors in downstream processing.

## Release dates and availability dates

FourthCoffee and VanArsdel, Ltd. both seem to track the ReleaseDate for each movie. These are stored as strings, and we'll need to look into the formats they've used.

Southridge Video is storing a releaseYear, an availabilityDate, and a streamingAvailabilityDate.

In [None]:
fc_dates = fc_catalog.select(col('ReleaseDate').alias('FcReleaseDate'), col('MovieId'), col('OnlineMovieId'))
va_dates = va_catalog.select(col('ReleaseDate').alias('VaReleaseDate'), col('MovieId'), col('OnlineMovieId'))
sr_dates = sr_catalog.select(*['id', 'releaseYear', 'availabilityDate', 'streamingAvailabilityDate'])

joined_dates = sr_dates \
  .join(fc_dates, F.upper(sr_dates.id) == fc_dates.OnlineMovieId, how='left') \
  .join(va_dates, sr_dates.id == va_dates.OnlineMovieId, how='left')

joined_dates \
  .filter('FcReleaseDate is not null AND VaReleaseDate is not null') \
  .limit(1) \
  .collect()

### Looking at the dates

```text
[Row(id='300e034e-4260-49ed-85e2-39a8d5030713',
    releaseYear=2015,
    availabilityDate='2017-07-25 00:00:00',
    streamingAvailabilityDate='2017-09-19 00:00:00',
    FcReleaseDate='07-25-2017',
    OnlineMovieId='300E034E-4260-49ED-85E2-39A8D5030713',
    VaReleaseDate='07-25-2017',
    OnlineMovieId='300e034e-4260-49ed-85e2-39a8d5030713')]
```

Above, we see that:  
  - Southridge has recorded a 2015 releaseYear for this movie
  - Southridge has recorded that the movie is available as of 25 Jul 2017
  - Southridge has recorded that the movie is available for streaming as of 19 Sep 2017
  - Fourth Coffee has recorded that the movie has a ReleaseDate of 25 Jul 2017
  - VanArsdel, Ltd. has similarly recorded the ReleaseDate as 25 Jul 2017

It looks like Fourth Coffee is tracking the "release" in terms of when it became available to rent. Only Southridge Video has tracked the original theatrical release year. This detail was useful for their web-based front-end, but the brick and mortar stores previously had no business need for it.

Each of the source system has stored dates in different formats, but we will use true date types in our conformed intermediate schema.

> Note: We are not only parsing the dates here, but also adding a column to track the SourceSystemId

In [None]:
fc_catalog = fc_catalog \
  .withColumn('ReleaseDate', F.to_date(col('ReleaseDate'), 'MM-dd-yyyy')) \
  .withColumn('SourceSystemId', F.lit('FC'))

va_catalog = va_catalog \
  .withColumn('ReleaseDate', F.to_date(col('ReleaseDate'), 'MM-dd-yyyy')) \
  .withColumn('SourceSystemId', F.lit('VA'))

sr_catalog = sr_catalog \
  .withColumn('availabilityDate', F.to_date(col('availabilityDate'), 'yyyy-MM-dd HH:mm:ss')) \
  .withColumn('streamingAvailabilityDate', F.to_date(col('streamingAvailabilityDate'), 'yyyy-MM-dd HH:mm:ss')) \
  .withColumn('SourceSystemId', F.lit('SR'))

## Unioning the data

### Target schema

Looking ahead, we will keep every source record from every catalog, so we don't need to join here.
However, we will need to map columns to a consistent schema.

```
SourceSystemId:              Use the SourceSystemId we added in the previous cell.
CatalogId:                   Use a new unique identifier.
SourceSystemMovieId:         From Southridge, use the source id. From the others, use the source MovieId.
SouthridgeMovieId:           From Southridge, use the source id. From the others, use the source OnlineMovieId.
ActorID:                     From Southridge, this is null. From the others, it's the source ActorId.
ActorName:                   From Southridge, it's the exploded actor name. From the others, it is the ActorName.
ActorGender:                 Southridge does not track this data. The on premises stores have Gender.
Title:                       From Southridge, use title. From others, MovieTitle.
Genre:                       From Southridge, use genre. From others, Category.
Rating:                      Southridge has rating and the others have Rating.
RuntimeMinutes:              Southridge has runtime, the others have RunTimeMin.
TheatricalReleaseYear:       Southridge has releaseYear. The others don't have this data.
PhysicalAvailabilityDate:    Southridge has availabilityDate. The others have ReleaseDate.
StreamingAvailabilityDate:   Southridge has streamingAvailabilityDate. The others have no such data, as it does not apply to physical rentals.
```

### To join, cleanse, drop duplicates, etc. ... or not?

At this stage, we want to focus on the **fatal** anomalies that would cause exceptions in downstream processing;
e.g., inconsistent data types or formats.
If we were loading this data directly into a final reporting schema, we would likely apply additional cleansing such as:

- Look for and eliminate typos, e.g., PGg instead of PG
- Normalize capitalization of titles, names, ratings, etc.
- Look for and resolve conflicts in matched movies, e.g., Southridge thinks Mysterious Cube is a G-rated family movie while VanArsdel, Ltd. had it as a PG-13 rated Comedy
- Look for variations in actor names and choose one to use consistently throughout the reporting schema, e.g., Vivica A. Fox vs Vivica Fox
- Drop duplicates
- etc., etc., etc.

However, if we perform these operations now, then we may eliminate the opportunity to discover previously unrecognized value in the data.
As a contrived example, consider a possibility that some actors and actresses would occassionally use their middle initial, but sometimes would not.
Now, imagine that data scientists uncover a trend where films are more marketable when the cast does use their middle initial versus when they do not.
Or maybe that only holds true in the Drama genre, but it does not hold in Family movies.
If we have already chosen the person's "usual" billing and only kept that version in our conformed dataset,
the the data scientists would never be able to see this.

In [None]:
# This is used in the following cells to create a new unique identifier

uuidUdf = F.udf(lambda : str(uuid.uuid4()), StringType())

In [None]:
display(sr_catalog.take(4))

In [None]:
import uuid
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType
sr_conformed = sr_catalog \
  .select([ \
    col("SourceSystemId"), \
    col("id").alias("SourceSystemMovieId"), \
    col("id").alias("SouthridgeMovieId"), \
    col("availabilityDate").cast(TimestampType()).alias("PhysicalAvailabilityDate"), \
    col("streamingAvailabilityDate").cast(TimestampType()).alias("StreamingAvailabilityDate"), \
    col("genre").alias("Genre"), \
    col("title").alias("Title"), \
    col("rating").alias("Rating"), \
    col("runtime").alias("RuntimeMinutes"), \
    col("releaseYear").alias("TheatricalReleaseYear"), \
    sr_catalog["actor.name"].alias("ActorName")]) \
  .withColumn("ActorId", F.lit(None).cast(StringType())) \
  .withColumn("ActorGender", F.lit(None).cast(StringType())) \
  .withColumn("CatalogId", uuidUdf())

In [None]:
# VanArsdel and Fourth Coffee are extremely similar

va_conformed = va_catalog \
  .select([ \
    col("SourceSystemId"), \
    col("MovieID").alias("SourceSystemMovieId"), \
    col("OnlineMovieID").alias("SouthridgeMovieId"), \
    col("ReleaseDate").cast(TimestampType()).alias("PhysicalAvailabilityDate"), \
    F.lit(None).cast(TimestampType()).alias("StreamingAvailabilityDate"), \
    col("Category").alias("Genre"), \
    col("MovieTitle").alias("Title"), \
    col("Rating").alias("Rating"), \
    col("RunTimeMin").cast(LongType()).alias("RuntimeMinutes"), \
    F.lit(None).cast(LongType()).alias("TheatricalReleaseYear"), \
    col("ActorName"), \
    col("MovieActorID").alias("ActorID"), \
    col("Gender").alias("ActorGender")]) \
  .withColumn("CatalogId", uuidUdf())

fc_conformed = fc_catalog \
  .select([ \
    col("SourceSystemId"), \
    col("MovieID").alias("SourceSystemMovieId"), \
    col("OnlineMovieID").alias("SouthridgeMovieId"), \
    col("ReleaseDate").cast(TimestampType()).alias("PhysicalAvailabilityDate"), \
    F.lit(None).cast(TimestampType()).alias("StreamingAvailabilityDate"), \
    col("Category").alias("Genre"), \
    col("MovieTitle").alias("Title"), \
    col("Rating").alias("Rating"), \
    col("RunTimeMin").cast(LongType()).alias("RuntimeMinutes"), \
    F.lit(None).cast(LongType()).alias("TheatricalReleaseYear"), \
    col("ActorName"), \
    col("MovieActorID").alias("ActorID"), \
    col("Gender").alias("ActorGender")]) \
  .withColumn("CatalogId", uuidUdf())

In [None]:
# ADLS Key
spark.conf.set(
  'fs.azure.account.key.<account-name>.dfs.core.windows.net'
  , '<account key>'
)

In [None]:
# The full catalog is now a straightforward union

full_catalog = sr_conformed.union(va_conformed).union(fc_conformed)
full_catalog.write.mode("overwrite").parquet("abfss://<container-name>@<account-name>.dfs.core.windows.net/folder")

In [None]:
# Let's rehydrate and confirm that everything checks out

rehydrated_catalog = spark.read.parquet("abfss://<container>@<account-name>.dfs.core.windows.net/catalog")

sr_rehydrated = rehydrated_catalog.filter("SourceSystemID=='SR'")

sr_actors_per_movie = sr_rehydrated \
  .groupby(col('SourceSystemMovieId')) \
  .agg(F.count(F.lit(1)).alias('ActorCount'))

print('The number of rows from Southridge is ', sr_rehydrated.count())
print('The number of distinct movies from Southridge is', sr_actors_per_movie.count())
print(sr_actors_per_movie.limit(1).collect())

In [None]:
full_catalog.show()