# Laboratory work N1
## Imports and settings

In [1]:
import numpy as np
import pandas as pd

from typing import Union, Any
import configparser

import psycopg2
from psycopg2.extras import DictCursor, RealDictCursor

from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL

import json
import datetime
import time

In [2]:
import warnings
warnings.filterwarnings('ignore')
pd.set_option('display.max_columns', None)

In [3]:
def config(filename :str = 'database.ini', section: str ='postgresql') -> dict[str, Any]:
    parser = configparser.ConfigParser()
    parser.read(filename)
    if parser.has_section(section):
        params = parser.items(section)
        return dict(params)
    else: raise Exception('Invalid .ini file')

## Utils

In [4]:
import re

def camel_to_snake(string: str) -> str:
    string = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', string)
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', string).lower()

def remove_consecutive_underscores(string: str) -> str:
    return re.sub('_+', '_', string)
        
def get_rename_mapping(columns: list[str]) -> dict[str, str]:
    renamed_columns = map(remove_consecutive_underscores,
                          map(camel_to_snake, columns))
    mapping = zip(columns, renamed_columns)
    return dict(mapping)

def extract_number(string: str) -> str:
    numbers = re.findall('\d+', str(string))
    return numbers[0] if numbers else None

In [5]:
def get_intersection(f_series: pd.Series, s_series: pd.Series) -> pd.Series:
    return pd.Series(np.intersect1d(f_series.dropna(), s_series.dropna()))

In [6]:
def extract_date(data: pd.Series) -> pd.DataFrame:
    dates = pd.to_datetime(data, errors='coerce')
    df = pd.DataFrame({
        'day': dates.dt.day,
        'month': dates.dt.month,
        'year': dates.dt.year,
    })
    return df

In [7]:
def get_values_string(values_number: int) -> str:
    return '(' + ('%s,' * values_number)[:-1] + ')'

## Postgres utils
### QuerySet wrapper

In [8]:
class QuerySet:
    def __init__(self, query_set):
        self.query_set = query_set

    def all(self):
        if self.query_set:
            return self.query_set.fetchall()
        else: return None

    def one(self):
        if self.query_set:
            return self.query_set.fetchone()
        else: return None

### Postgres API

In [9]:
class PgAPI:
    @staticmethod
    def get_db():
        params = config()
        conn = psycopg2.connect(**params)
        conn.autocommit = True
        return conn
        
    @staticmethod
    def get_cursor(cursor_factory=None):
        return PgAPI.get_db().cursor(cursor_factory=cursor_factory)

    @staticmethod
    def execute_query(query: str, *args) -> QuerySet:
        cursor = PgAPI.get_cursor()
        cursor.execute(query, args)
        return QuerySet(cursor)

    @staticmethod
    def execute_dict_query(query: str, *args) -> QuerySet:
        cursor = PgAPI.get_cursor(DictCursor)
        cursor.execute(query, args)
        return QuerySet(cursor)
    
    @staticmethod
    def execute_rdict_query(query: str, *args) -> QuerySet:
        cursor = PgAPI.get_cursor(RealDictCursor)
        cursor.execute(query, args)
        return QuerySet(cursor)

    @staticmethod
    def execute_call(query: str, *args) -> None:
        cursor = PgAPI.get_cursor()
        cursor.execute(query, args)

### Database utils (init / drop / clear)

In [10]:
def init_db(file_name: str = 'schema.sql') -> None:
    with open(file_name, 'r') as fhand:
        query = fhand.read()
        PgAPI.execute_call(query)

def init_dates() -> None:
    query = 'CALL fill_dates()'
    PgAPI.execute_call(query)

def clear_db() -> None:
    t_query = "SELECT tablename FROM pg_tables\
        WHERE schemaname='public'"
    query_set = PgAPI.execute_query(t_query).all()
    for record in query_set:
        query = 'TRUNCATE ' + record[0] + ' CASCADE'
        PgAPI.execute_call(query)

### Create SQLAlchemy engine

In [11]:
params = config()
params['username'] = params.pop('user')  # rename key
params['drivername'] = 'postgresql'
conn_url = URL(**params)
engine = create_engine(conn_url)

### QueryBuilder

In [12]:
def select(t_name: str, columns: list[str]) -> str:
    q_columns = ', '.join(columns)
    q_from = ' '.join(['FROM', t_name])
    q_where = 'WHERE day IS NULL AND month IS NULL' \
        if t_name == 'dates' and len(columns) < 3 else ''
    return ' '.join(['SELECT', q_columns, q_from, q_where])

In [13]:
def insert(t_name: str, df: pd.DataFrame) -> None:
    df.to_sql(t_name, con=engine, if_exists='append', index=False)

In [14]:
def where(columns: list[str], operators: list[str], values: list[str]) -> str:
    clause_body = ' AND '.join([
        ' '.join([columns[i], operators[i], values[i]])
        for i in range(len(columns))
    ])
    return ' '.join(['WHERE', clause_body])

## Providers beetwen pandas dataframes and database tables

### Insert unique data from dataset to the database table

In [15]:
def insert_unique(t_name: str, df: pd.DataFrame, fields: Union[str, list[str], dict[str, str]]) -> None:
    if isinstance(fields, str): fields = [fields]
    
    if isinstance(fields, dict):
        df_cols, db_cols = zip(*fields.items())
    else: df_cols = db_cols = tuple(fields)
    
    query = select(t_name, db_cols)
    data = PgAPI.execute_dict_query(query).all()
    df_data = df[list(df_cols)].drop_duplicates()
    mapping = dict(zip(df_cols, db_cols))
    
    if not data:
        insert(t_name, df_data.rename(columns=mapping))
        return
    
    db_data = pd.DataFrame(data, columns=db_cols)
    db_data.loc[-1] = df_data.loc[0]
    
    new_data = df_data.merge(
        right=db_data, how='left',
        left_on=df_cols, right_on=db_cols,
        indicator=True
    )
    drops = ['_merge', *set(db_cols).difference(df_cols)]
    
    to_insert = new_data[new_data['_merge']=='left_only']\
        .drop(columns=drops)\
        .rename(columns=mapping)

    insert(t_name, to_insert)

### Join dataframe and database table

In [16]:
def join(t_name: str, df: pd.DataFrame, fields: Union[str, list[str], dict[str, str]], out: str) -> pd.DataFrame:
    if isinstance(fields, str): fields = [fields]
        
    if isinstance(fields, dict):
        df_cols, db_cols = zip(*fields.items())
    else: df_cols = db_cols = tuple(fields)
    
    query = select(t_name,  ('id', ) + db_cols)
    data = PgAPI.execute_dict_query(query).all()
    db_data = pd.DataFrame(data, columns=('id', ) + db_cols)
    df_data = df[list(df_cols)]
    
    join_ids = df_data.merge(
        right=db_data, how='left',
        left_on=df_cols, right_on=db_cols
    )['id']
    
    to_return = df.drop(list(df_cols), axis=1)
    to_return[out] = join_ids
    to_return = to_return.reset_index(drop=True)
    return to_return

## Mapping between dataframes and tables

### Base class

In [17]:
class Table:
    def __init__(self, df: pd.DataFrame, fields: list[str], 
            fact_type: str) -> None:
        self.df = df[fields]
        self.fact_type = fact_type
        self.config_name = ''.join([fact_type, '.json'])
       
    def rename(self, mapping: dict[str, str]) -> None:
        self.df = self.df.rename(columns=mapping)
    
    def dropna(self, columns: list[str]) -> None:
        self.df = self.df.dropna(subset=columns)
            
    def load_foreign(self, mapping: dict[str, dict[str, str]]) -> None:            
        for t_name in mapping:
            
            params = mapping[t_name]
            if not isinstance(params[1], list):
                params = [params]

            for param in params:
                fields, id_name = param
                insert_unique(t_name, self.df, fields)
                self.df = join(t_name, self.df, fields, id_name)
    
    def __repr__(self) -> str:
        return repr(self.df)
    
    def load(self) -> None:
        with open(f'configs/{self.config_name}') as fhand:
            data = json.load(fhand)
            self.load_foreign(data)
    
    def load_fact(self) -> None:
        fact = Fact(self.df, self.fact_type)
        insert('facts', fact.df)

In [18]:
class Fact(Table):
    def __init__(self, df: pd.DataFrame, f_type: str) -> None:
        fields = list(df.columns)
        super().__init__(df, fields, '')
        self.df['type_name'] = f_type
        self.load_foreign({
            'fact_types': [['type_name'], 'type_id']
        })
        self.dropna(['movie_id', 'movie_date_id', 'type_id'])

In [19]:
class Ranking(Table):
    def __init__(self, df: pd.DataFrame, year: int) -> None:
        fields = [el for el in df.columns if el not in [
            'Fandango_Stars', 'Fandango_Difference',
            'RT_norm_round', 'RT_user_norm_round',
            'Metacritic_user_norm_round',
            'IMDB_norm_round', 'Metacritic_norm_round'
        ]]
        super().__init__(df, fields, 'ranking')
        mapping = get_rename_mapping(self.df.columns)
        self.rename(mapping)
        self._parse_dates()
        self._parse_films()
        self._init_ids()
        self.dropna(['film', 'year'])
        self.df['ranking_year'] = year
    
    def _parse_dates(self) -> None:
        years = self.df['film'].apply(
            lambda s: s[s.index('(')+1:-1]
        ).astype(int)
        self.df = self.df.reset_index(drop=True)
        self.df['year'] = years
    
    def _parse_films(self) -> None:
        self.df['film'] = self.df['film'].apply(
            lambda s: s[:s.index('(')]
        )
    
    def _init_ids(self) -> None:
        types = ['fandango', 'metacritic', 'rotten_tomatoes', 'imdb']
        query = 'SELECT id FROM ranking_types WHERE type_name = %s'
        for type_name in types:
            id_name = '_'.join([type_name, 'type', 'id'])
            query_set = PgAPI.execute_query(query, type_name)
            self.df[id_name] = query_set.one()[0]

In [20]:
class Movie(Table):
    def __init__(self, df: pd.DataFrame) -> None:
        fields = [el for el in df.columns if el not in [
            'title', 'imdb_title_id',
            'metascore', 'year', 'actors'
        ]]
        super().__init__(df, fields, 'movie')
        self.rename({
            'original_title': 'name',
            'reviews_from_critics': 'critics_reviews',
            'reviews_from_users': 'users_reviews',
            'usa_gross_income': 'usa_income',
            'worlwide_gross_income': 'worldwide_income',
            'votes': 'users_votes',
            'avg_vote': 'users_score'
        })
        self._parse_dates()
        self._parse_currencies()
        self.dropna([
            'director', 'language', 'production_company',
            'writer', 'genre', 'year', 'country', 'name'
        ])
        self.df['type_name'] = 'imdb'
    
    def _parse_currencies(self) -> None:
        for column in ['usa_income', 'worldwide_income', 'budget']:
            self.df[column] = self.df[column]\
                .apply(extract_number)\
                .astype(float)
        
    def _parse_dates(self) -> None:
        date_parts = extract_date(self.df['date_published'])
        self.df = self.df.reset_index(drop=True)
        self.df = pd.concat([self.df, date_parts], axis=1)
        self.df = self.df.drop(['date_published'], axis=1)

In [21]:
class Oscar(Table):
    def __init__(self, df: pd.DataFrame) -> None:
        fields = list(df.columns)
        super().__init__(df, fields, 'oscar')
        self.rename({
            'ceremony': 'ceremony_number',
            'winner': 'is_winner'
        })
        self.dropna(['film', 'category', 'name'])

## Load datasets from .csv

In [22]:
movies = pd.read_csv('datasets/imdb.csv')
oscars = pd.read_csv('datasets/oscars.csv')
rankings = pd.read_csv('datasets/ranking.csv')

## Fill database

In [23]:
#clear_db()
init_db()
init_dates()

## Fill types tables

### Fill fact types

In [24]:
fact_types = ('oscar', 'movie', 'ranking')
values_string = ('(%s),' * len(fact_types))[:-1]
PgAPI.execute_call('INSERT INTO fact_types (type_name) VALUES ' + values_string, *fact_types)

### Fill ranking types

In [25]:
ranking_types = ('imdb', 'metacritic', 'rotten_tomatoes', 'fandango')
values_string = ('(%s),' * len(ranking_types))[:-1]
PgAPI.execute_call('INSERT INTO ranking_types (type_name) VALUES ' + values_string, *ranking_types)

## Load datasets to database

In [26]:
ranking = Ranking(rankings, year=2015)
ranking.load()

In [27]:
movie = Movie(movies)
movie.load()

In [28]:
oscar = Oscar(oscars)
oscar.load()

## Load facts

In [29]:
movie.load_fact()

In [30]:
oscar.load_fact()

In [31]:
ranking.load_fact()