In [25]:
import os
import json
import pandas as pd
import logging
import requests
import datetime

In [29]:
ds = "2019-01-01"
next_ds = "2020-01-02"

In [30]:
MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "localhost")
MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http")
MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5000")

MOVIELENS_USER = os.environ.get("MOVIELENS_USER", "airflow")
MOVIELENS_PASSWORD = os.environ.get("MOVIELENS_PASSWORD", "airflow")

In [31]:
def _get_session():
    session = requests.Session()
    session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD)  # Replace with your credentials

    base_url = f"{MOVIELENS_SCHEMA}://{MOVIELENS_HOST}:{MOVIELENS_PORT}"

    return session, base_url

In [32]:
session, base_url = _get_session()

In [33]:
session

<requests.sessions.Session at 0x7c6a78593b60>

In [34]:
base_url

'http://localhost:5000'

In [35]:
def _get_with_pagination(session, url, params, batch_size=100):
    
    offset = 0
    total = None
    while total is None or offset < total:
        response = session.get(url,
                               params={
                                   **params,
                                   **{"offset": offset, "limit": batch_size}
                               })
        response.raise_for_status()
        response_json = response.json()

        yield from response_json["result"] # list

        offset += batch_size
        total = response_json["total"]

In [36]:
def _get_ratings(start_date, end_date, batch_size=100):
    session, base_url = _get_session()

    yield from _get_with_pagination(  # generator
        session=session,
        url=f"{base_url}/ratings",
        params={
            "start_date": start_date,
            "end_date": end_date,
        },
        batch_size=batch_size,
    )

In [37]:
a1 = _get_ratings(ds, next_ds)
a1

<generator object _get_ratings at 0x7c6a6db65440>

In [38]:
next(a1)

{'movieId': 196997, 'rating': 4.0, 'timestamp': 1546301166, 'userId': 27667}

In [39]:
next(a1)

{'movieId': 1213, 'rating': 5.0, 'timestamp': 1546301290, 'userId': 27667}

In [41]:
def _fetch_ratings(templates_dict, batch_size=1000, **_):
    logger = logging.getLogger(__name__)
    
    start_date = templates_dict["start_date"]
    end_date = templates_dict["end_date"]
    output_path = templates_dict["output_path"]
    logger.info(f"Fetching ratings from {start_date} to {end_date} into {output_path}")

    ratings = list(
        _get_ratings(start_date, end_date, batch_size=batch_size)
    )

    logger.info(f"Fetched {len(ratings)} ratings")

    logger.info(f"Writing ratings to {output_path}")

    output_dir = os.path.dirname(output_path)
    os.makedirs(output_dir, exist_ok=True)

    print(ratings)

    with open(output_path, "w") as f:
        json.dump(ratings, f)

In [42]:
c1 = _fetch_ratings(
    templates_dict={
        "start_date": ds,
        "end_date": next_ds,
        "output_path": f"/tmp/practice/ratings{ ds }.json"
    },
    batch_size=10000,
)
c1

[{'movieId': 196997, 'rating': 4.0, 'timestamp': 1546301166, 'userId': 27667}, {'movieId': 1213, 'rating': 5.0, 'timestamp': 1546301290, 'userId': 27667}, {'movieId': 55820, 'rating': 4.0, 'timestamp': 1546301706, 'userId': 27667}, {'movieId': 2329, 'rating': 4.0, 'timestamp': 1546301788, 'userId': 27667}, {'movieId': 4878, 'rating': 4.0, 'timestamp': 1546301842, 'userId': 27667}, {'movieId': 589, 'rating': 4.0, 'timestamp': 1546302155, 'userId': 128817}, {'movieId': 119145, 'rating': 2.0, 'timestamp': 1546302225, 'userId': 27667}, {'movieId': 2716, 'rating': 4.0, 'timestamp': 1546302513, 'userId': 27667}, {'movieId': 165, 'rating': 4.0, 'timestamp': 1546302548, 'userId': 27667}, {'movieId': 57669, 'rating': 4.0, 'timestamp': 1546302649, 'userId': 27667}, {'movieId': 5418, 'rating': 4.0, 'timestamp': 1546302687, 'userId': 27667}, {'movieId': 377, 'rating': 4.5, 'timestamp': 1546302763, 'userId': 27667}, {'movieId': 1387, 'rating': 3.5, 'timestamp': 1546303369, 'userId': 27667}, {'movie

In [43]:
# listing 8.6

In [45]:
df = pd.read_json(f"/tmp/practice/ratings{ ds }.json")
df

Unnamed: 0,movieId,rating,timestamp,userId
0,196997,4.0,2019-01-01 00:06:06,27667
1,1213,5.0,2019-01-01 00:08:10,27667
2,55820,4.0,2019-01-01 00:15:06,27667
3,2329,4.0,2019-01-01 00:16:28,27667
4,4878,4.0,2019-01-01 00:17:22,27667
...,...,...,...,...
99995,122914,4.0,2019-11-21 08:37:19,85523
99996,8874,4.0,2019-11-21 09:01:09,85523
99997,134130,3.5,2019-11-21 09:02:11,85523
99998,7458,4.0,2019-11-21 09:04:38,85523


In [46]:
df.groupby("movieId").agg(
    avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
    num_ratings=pd.NamedAgg(column="userId", aggfunc="count")
)

Unnamed: 0_level_0,avg_rating,num_ratings
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
1,3.993007,143
2,3.500000,49
3,2.625000,4
4,1.250000,2
5,3.083333,6
...,...,...
209119,3.500000,1
209123,4.000000,1
209135,3.500000,1
209147,1.000000,1


In [47]:
df.groupby("movieId").agg(
    avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
    num_ratings=pd.NamedAgg(column="userId", aggfunc="count")
).loc[lambda df: df["num_ratings"] > 2]

Unnamed: 0_level_0,avg_rating,num_ratings
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
1,3.993007,143
2,3.500000,49
3,2.625000,4
5,3.083333,6
6,3.912500,40
...,...,...
206805,3.333333,3
206845,3.166667,3
207309,3.400000,5
207405,4.333333,3


In [48]:
df.groupby("movieId").agg(
    avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
    num_ratings=pd.NamedAgg(column="userId", aggfunc="count")
).loc[lambda df: df["num_ratings"] > 2].sort_values(
    ["avg_rating", "num_ratings"],
    ascending=True
)

Unnamed: 0_level_0,avg_rating,num_ratings
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
57532,1.000000,3
34520,1.083333,6
737,1.100000,5
3593,1.111111,9
50798,1.125000,4
...,...,...
61406,4.833333,3
147124,4.833333,3
163112,4.833333,3
92475,5.000000,3


In [49]:
ranking = (
    df.groupby("movieId").agg(
    avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
    num_ratings=pd.NamedAgg(column="userId", aggfunc="count")
).loc[lambda df: df["num_ratings"] > 2].sort_values(
    ["avg_rating", "num_ratings"],
    ascending=True
)
)
ranking

Unnamed: 0_level_0,avg_rating,num_ratings
movieId,Unnamed: 1_level_1,Unnamed: 2_level_1
57532,1.000000,3
34520,1.083333,6
737,1.100000,5
3593,1.111111,9
50798,1.125000,4
...,...,...
61406,4.833333,3
147124,4.833333,3
163112,4.833333,3
92475,5.000000,3


In [50]:
def rank_movies_by_rating(ratings, min_ratings=2):
    ranking = (
        ratings.groupby("movieId").agg(
            avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
            num_ratings=pd.NamedAgg(column="userId", aggfunc="count")
        ).loc[lambda df: df["num_ratings"] > 2].sort_values(
            ["avg_rating", "num_ratings"],
            ascending=True
        )
    )
    return ranking

In [51]:
# listing 8.7

In [52]:
def _rank_movies(templates_dict, min_ratings=2, **_):
    input_path = templates_dict["input_path"]
    output_path = templates_dict["output_path"]

    rankings = pd.read_json(input_path)
    ranking = rank_movies_by_rating(rankings, min_ratings=min_ratings)

    output_dir = os.path.dirname(output_path)
    os.makedirs(output_dir, exist_ok=True)

    ranking.to_json(output_path, index=True)

In [56]:
def rank_movies_pythonoperator():
    templates_dict = {
        "input_path": f"/tmp/practice/ratings{ ds }.json",
        "output_path": f"/tmp/practice/rankings{ ds }.json"
    }
    _rank_movies(templates_dict, min_ratings=2)

In [57]:
rank_movies_pythonoperator()