In [1]:
import pandas as pd
import polars as pl
import time
import json
import re
import ast

In [1]:
import pyarrow as pa
import pyarrow.parquet as pa
import pyarrow.compute as pc

## Genres

In [None]:
@asset(
    description="Create table 'genres' for each genre.",
    io_manager_key=ABS_IO_MANAGER,
    group_name=LAYER,
    key_prefix=KEY_PREFIX,
    compute_kind="Pandas",
    ins={
        "bronze_movies_metadata": AssetIn(
            key_prefix=UPSTREAM_KEY_PREFIX,
        ),
    },
)
def silver_genres(context, bronze_movies_metadata):
    table_name = "silver_genres"
    
    schema = pa.schema([
        ("genre_id", pa.int64()),
        ("name", pa.string())
    ])
    
    columns = ["genre_id", "name"]
    
    pd_data = pa.Table.from_pandas(pd.DataFrame(columns=columns), schema=schema)

    t_start = time.time()

    try:
        i = 1
        while True:
            batch_tstart = time.time()

            df = next(bronze_movies_metadata)
            
            for row in df["genres"]:
                genres_list = ast.literal_eval(row)

                for genre in genres_list:
                    if pd_data.filter(pc.field("genre_id") == genre["id"]).num_rows > 0:
                        continue

                    record = pa.Table.from_pandas(
                        pd.DataFrame([[genre["id"], genre["name"]]], columns=columns),
                        schema=schema
                    )
            
                    pd_data = pa.concat_tables([pd_data, record])
    
            print(f"Completed batch {i} in: {time.time() - batch_tstart} seconds.")
            i += 1
    except StopIteration:
        pd_data = pd_data.to_pandas()
        pd_data.sort_values(by=["genre_id"], inplace=True)
        context.log.info(f"Check duplicated genres: {pd_data.duplicated(['genre_id']).any()}")
        context.log.info(f"Completed transforming in: {time.time() - t_start} seconds.")
        return Output(
            pd_data,
            metadata={
                "table name": table_name,
                "records count": pd_data.shape[0],
                "columns count": pd_data.shape[1],
                "columns": pd_data.columns.to_list(),
            }
        )
    except Exception as e:
        context.log.error(str(e))

## Movies Genres

In [None]:
@asset(
    description="Create table 'movies_genres' for each movie.",
    io_manager_key=ABS_IO_MANAGER,
    group_name=LAYER,
    key_prefix=KEY_PREFIX,
    compute_kind="Pandas",
    ins={
        "bronze_movies_metadata": AssetIn(
            key_prefix=UPSTREAM_KEY_PREFIX,
        ),
    },
)
def silver_movies_genres(context, bronze_movies_metadata):
    table_name = "silver_movies_genres"
    
    schema = pa.schema([
        ("tmdb_id", pa.int64()),
        ("genre_id", pa.int64())
    ])
    columns = ["tmdb_id","genre_id"]
    
    pd_data = pa.Table.from_pandas(pd.DataFrame(columns=columns), schema=schema)
    
    t_start = time.time()

    try:
        i = 1
        while True:
            batch_tstart = time.time()

            df = next(bronze_movies_metadata)

            for index, row in df.iterrows():
                tmdb_id = row["id"]
                genres_list = ast.literal_eval(row["genres"])

                for genre in genres_list:
                    record = pa.Table.from_pandas(
                        pd.DataFrame([[tmdb_id, genre["id"]]], columns=columns),
                        schema=schema
                    )
                    pd_data = pa.concat_tables([pd_data, record])

            context.log.info(f"Completed batch {i} in: {time.time() - batch_tstart} seconds.")
            i += 1    
    except StopIteration:
        pd_data.sort_values(by=["tmdb_id"], inplace=True)
        context.log.info(f"Check duplicated rows: {pd_data.duplicated().any()}")
        context.log.info(f"Completed transforming in: {time.time() - t_start} seconds.")
        return Output(
            pd_data,
            metadata={
                "table name": table_name,
                "records count": pd_data.shape[0],
                "columns count": pd_data.shape[1],
                "columns": pd_data.columns.to_list(),
            }
        )
    except Exception as e:
        context.log.error(str(e))

In [10]:
import time
import ast
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
df1 = pq.ParquetFile("./tmp/bronze_movies_metadata.parquet")
schema = pa.schema([
        ("tmdb_id", pa.int64()),
        ("genre_id", pa.int64())
    ])
columns = ["tmdb_id","genre_id"]
pd_data = pa.Table.from_pandas(pd.DataFrame(columns=columns), schema=schema)
#arrow_writer = pa.RecordBatchStreamWriter("example.arrow", arrow_schema)

i = 1
for bronze_movies_metadata in df1.iter_batches(batch_size=4500):
    batch_tstart = time.time()
    df = bronze_movies_metadata.to_pandas()
    #df = next(bronze_movies_metadata)

    for index, row in df.iterrows():
        tmdb_id = int(row["id"])
        genres_list = ast.literal_eval(row["genres"])

        for genre in genres_list:
            #print(type(genre["id"]))
            #print(type(genre["name"]))
            record = pa.Table.from_pandas(
                pd.DataFrame([[tmdb_id, genre["id"]]], columns=columns),
                schema=schema
            )
            pd_data = pa.concat_tables([pd_data, record])

    print(f"Completed batch {i} in: {time.time() - batch_tstart} seconds.")
    i += 1

Completed batch 1 in: 7.554168701171875 seconds.
Completed batch 2 in: 19.536564826965332 seconds.
Completed batch 3 in: 47.36430239677429 seconds.
Completed batch 4 in: 98.90609931945801 seconds.


ValueError: invalid literal for int() with base 10: '1997-08-20'

In [None]:
pd_data

## Companies

In [None]:
@asset(
    description="Create table 'companies' for each production company.",
    io_manager_key=ABS_IO_MANAGER,
    group_name=LAYER,
    key_prefix=KEY_PREFIX,
    compute_kind="Pandas",
    ins={
        "bronze_movies_metadata": AssetIn(
            key_prefix=UPSTREAM_KEY_PREFIX,
        ),
    },
)
def silver_companies(context, bronze_movies_metadata):
    table_name = "silver_companies"

    columns=["comp_id", "name"]
    
    pd_data = pd.DataFrame()

    t_start = time.time()

    try:
        i = 1
        while True:
            batch_tstart = time.time()

            df = next(bronze_movies_metadata)

            for row in df["production_companies"]:
                comps_list = ast.literal_eval(row)

                for comp in comps_list:
                    if pd_data.isin([comp["id"]]).any().any():
                        continue
                    pd_data.loc[len(pd_data)] = [comp["id"], comp["name"]]

            context.log.info(f"Completed batch {i} in: {time.time() - batch_tstart} seconds.")
            i += 1    
    except StopIteration:
        pd_data.sort_values(by=["comp_id"], inplace=True)
        context.log.info(f"Check duplicated companies: {pd_data.duplicated(['comp_id']).any()}")
        context.log.info(f"Completed transforming in: {time.time() - t_start} seconds.")
        return Output(
            pd_data,
            metadata={
                "table name": table_name,
                "records count": pd_data.shape[0],
                "columns count": pd_data.shape[1],
                "columns": pd_data.columns.to_list(),
            }
        )
    except Exception as e:
        context.log.error(str(e))

## Movies Production

## Languages

## Movies