### Read and process VOD Clickstream Kaggle dataset from Netflix

## First field is row ID

In [1]:
!uv pip install pandas seaborn numpy matplotlib pillow confluent-kafka

[2K[2mResolved [1m16 packages[0m [2min 193ms[0m[0m                                        [0m
[2K[2mPrepared [1m13 packages[0m [2min 772ms[0m[0m                                            
[2K[2mInstalled [1m13 packages[0m [2min 23ms[0m[0m                               [0m
 [32m+[39m [1mconfluent-kafka[0m[2m==2.6.1[0m
 [32m+[39m [1mcontourpy[0m[2m==1.3.1[0m
 [32m+[39m [1mcycler[0m[2m==0.12.1[0m
 [32m+[39m [1mfonttools[0m[2m==4.55.0[0m
 [32m+[39m [1mkiwisolver[0m[2m==1.4.7[0m
 [32m+[39m [1mmatplotlib[0m[2m==3.9.2[0m
 [32m+[39m [1mnumpy[0m[2m==2.1.3[0m
 [32m+[39m [1mpandas[0m[2m==2.2.3[0m
 [32m+[39m [1mpillow[0m[2m==11.0.0[0m
 [32m+[39m [1mpyparsing[0m[2m==3.2.0[0m
 [32m+[39m [1mpytz[0m[2m==2024.2[0m
 [32m+[39m [1mseaborn[0m[2m==0.13.2[0m
 [32m+[39m [1mtzdata[0m[2m==2024.2[0m


In [14]:
!uv pip install fastavro

[2K[2mResolved [1m1 package[0m [2min 103ms[0m[0m                                          [0m
[2K[2mPrepared [1m1 package[0m [2min 67ms[0m[0m                                               
[2K[2mInstalled [1m1 package[0m [2min 2ms[0m[0m                                  [0m
 [32m+[39m [1mfastavro[0m[2m==1.9.7[0m


In [1]:
import warnings
warnings.filterwarnings('ignore')
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

In [2]:
import pandas as pd
from datetime import date
import os

df = pd.read_csv('vodclickstream_uk_movies_03.csv')
# Rename columns in place
# df.rename(columns={'Unnamed: 0': 'row_id'}, inplace=True)

df.head(5)


Unnamed: 0,row_id,datetime,duration,title,genres,release_date,movie_id,user_id
0,58773,2017-01-01 01:15:09,0.0,"Angus, Thongs and Perfect Snogging","Comedy, Drama, Romance",2008-07-25,26bd5987e8,1dea19f6fe
1,58774,2017-01-01 13:56:02,0.0,The Curse of Sleeping Beauty,"Fantasy, Horror, Mystery, Thriller",2016-06-02,f26ed2675e,544dcbc510
2,58775,2017-01-01 15:17:47,10530.0,London Has Fallen,"Action, Thriller",2016-03-04,f77e500e7a,7cbcc791bf
3,58776,2017-01-01 16:04:13,49.0,Vendetta,"Action, Drama",2015-06-12,c74aec7673,ebf43c36b6
4,58777,2017-01-01 19:16:37,0.0,The SpongeBob SquarePants Movie,"Animation, Action, Adventure, Comedy, Family, ...",2004-11-19,a80d6fc2aa,a57c992287


In [3]:
from confluent_kafka import Producer, Consumer

def read_config():
  # reads the client configuration from client.properties
  # and returns it as a key-value map
  config = {}
  with open("client.properties") as fh:
    for line in fh:
      line = line.strip()
      if len(line) != 0 and line[0] != "#":
        parameter, value = line.strip().split('=', 1)
        config[parameter] = value.strip()
  return config

In [4]:
from confluent_kafka.schema_registry import SchemaRegistryClient
config = read_config()

schema_registry_client = SchemaRegistryClient({
  'url': config['schema.registry.url'],
  'basic.auth.user.info': '{}:{}'.format(config['schema.key'], config['schema.secret'])
})

subject_name = 'netflixbehavior-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str
print(schema_str)

{"type":"record","name":"netflixbehavior","namespace":"com.netflix.audience","doc":"kaggle uk netflix audience data.","fields":[{"name":"row_id","type":"int","doc":"row id"},{"name":"datetime","type":["string","null"],"doc":"datetime of the viewing"},{"name":"duration","type":["double","null"],"doc":"how long was it watched?"},{"name":"title","type":"string","doc":"title of the video"},{"name":"genres","type":["string","null"],"doc":"A list of comma-delimited movie/tv genres that apply to the title."},{"name":"release_date","type":["string","null"],"doc":"date of the titles release."},{"name":"movie_id","type":"string","doc":"id of the movie."},{"name":"user_id","type":"string","doc":"id of the user."}]}


In [10]:
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer
from datetime import datetime
import pprint
import time

t0 = time.time()

config = read_config()

schema_registry_client = SchemaRegistryClient({
  'url': config['schema.registry.url'],
  'basic.auth.user.info': '{}:{}'.format(config['schema.key'], config['schema.secret'])
})

subject_name = 'netflixbehavior-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str
# print(schema_str)

avro_serializer = AvroSerializer(schema_registry_client, schema_str)

producer_conf = {
    'bootstrap.servers': config['bootstrap.servers'],
    'security.protocol': config['security.protocol'],
    'sasl.mechanisms': config['sasl.mechanisms'],
    'sasl.username': config['sasl.username'],
    'sasl.password': config['sasl.password'],
    'value.serializer': avro_serializer,
    'key.serializer': StringSerializer(),
    'batch.size': 200000,
    'linger.ms': 100,
    'compression.type': 'Lz4',
    'acks': 1
}

#batch.size: increase to 100000–200000 (default 16384)
#linger.ms: increase to 10–100 (default 0)
#compression.type=lz4 (default none, i.e., no compression)
#acks=1 (default all, since Apache Kafka version 3.0)

producer = SerializingProducer(producer_conf)
producer.flush

i = 0

for index, row in df.iterrows():
    try: 
        value = row.to_dict()
        producer.produce(topic='netflixbehavior', key=str(value['row_id']), value=value)
        i = i + 1
        if (i % 10000 == 0):
            print(i)
            producer.flush

    except Exception as ex:
        print("Error:", ex)
        # if Error: Local: Queue full
        producer.flush
        time.sleep(10)
        producer.poll(0)
        # Producer terminating with 100000 messages (11521537 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
        # https://developer.confluent.io/tutorials/optimize-producer-throughput/confluent.html

t1 = time.time()
print(f"Loaded rows time: {round(t1-t0, 4)} seconds")
print("Row Count")
print(len(df.index) )


%6|1732720738.003|GETSUBSCRIPTIONS|rdkafka#producer-6| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to Ta1UqVU1Sha2lvVU9PSB5g


10000
20000
30000
40000
50000
60000
70000
80000
90000
100000
Error: Local: Queue full


%4|1732720751.812|TERMINATE|rdkafka#producer-2| [thrd:app]: Producer terminating with 100000 messages (11521537 bytes) still in queue or transit: use flush() to wait for outstanding message delivery
%4|1732720751.829|TERMINATE|rdkafka#producer-3| [thrd:app]: Producer terminating with 100000 messages (11521537 bytes) still in queue or transit: use flush() to wait for outstanding message delivery


KeyboardInterrupt: 