In [1]:
import requests
import pandas as pd
import numpy as np
import traceback
import mysql as mysql
from mysql import connector
from mysql.connector import Error
import os as os
os.chdir("/Users/balilty/Downloads/wix")



In [2]:
# config (dict): connection dictionary containing user, password, host, database
config = pd.read_csv("config.csv").to_dict('records')[0]

In [3]:
#task 1 
def get_api_data():
    """get API data
            Returns:
                DataFrame: the results returned in pandas dataframe.
    """
    res = requests.get("https://randomuser.me/api/?results=4500").json()
    df = pd.json_normalize(res.get('results'),sep="_")
    
    return df


In [4]:
def create_table(df,table_name:str):
    """generate a dynamic create table statement based on each column max length and max int width
            Parameters:
                df (DataFrame): dataframe to generate
                table name (str): the table name to create in DB

            Returns:
                String: create table statement
    """
    measurer = np.vectorize(len)
    max_char_for_object = measurer(df.select_dtypes(include=[object]).values.astype(str)).max(axis=0)
    max_int_width = measurer(df.select_dtypes(include=[int]).values.astype(str)).max(axis=0)

    # genarte dynamic create table statement based on columns dtypes
    create_stat = f"CREATE TABLE IF NOT EXISTS {table_name} (" + \
    ", ".join([x + ' VARCHAR ('+str(max_char_for_object[i])+')' for i,x in enumerate(df.select_dtypes(include=[object]))]) +\
    "," + ", ".join([x + ' INT ('+str(max_int_width[i])+')' for i,x in enumerate(df.select_dtypes(include=[int]))]) +");"

    return create_stat


In [5]:
def create_table_and_insert_data(df,table_name:str,config):
    """drop old table if exists, create new table and insert data to DB
            Parameters:
                df (DataFrame): dataframe to insert DB
                table name (str): the name of the table to create
                config (dict): connection dictionary containing user, password, host, database
    """
    connection = mysql.connector.connect(**config)
    cursor = connection.cursor(buffered=True)
    
    # drop and execute table if exists
    drop_stat = f"""DROP TABLE IF EXISTS {table_name};"""
    cursor.execute(drop_stat)
    
    # check if df not empty, create table and insert data
    if df.shape[0]>0:
        #dynamic create statement based on each column max length and max int width
        create_stat = create_table(df,table_name)
        # execute create table statement
        cursor.execute(create_stat)

        # prepare list of tuples for each value
        data = [tuple(x) for x in df.values.tolist()]

        # prepare dynamic value list  (%s, %s...)
        values = ", ".join(['%s' for c in df.columns])
        
        #prepare dynamic columns names
        columns_name = ", ".join(df.columns)

        # generate dynamic insert statement based on columns_name and values
        insert_stat = f"INSERT INTO {table_name} (" + \
                        columns_name + ")  VALUES (" +\
                        values +")"
        # execute
        cursor.executemany(insert_stat, data)
       
    # if df is empty, create table with dtype length 1
    else:
        create_stat = f"CREATE TABLE IF NOT EXISTS {table_name} (" +\
        ", ".join([x + ' VARCHAR (1)' for x in df.select_dtypes(include=[object])])  +\
        "," + ", ".join([x + ' INT (1)' for x in df.select_dtypes(include=[int])]) +");"
        
        # execute
        cursor.execute(create_stat)
        
    connection.commit()
    cursor.close()
    connection.close()

    


In [6]:
# task 2
def task_2(df,config):
    """split df to 2 datasets by gender and create new tables in DB
            Parameters:
                df (DataFrame): dataframe to split and insert DB
                config (dict): connection dictionary containing user, password, host, database
    """
    df_male = df.loc[df.gender=='male']
    df_female = df.loc[df.gender=='female']

    # save data to DB
    create_table_and_insert_data(df_male,"Kobi_Balilty_test_male",config)
    create_table_and_insert_data(df_female,"Kobi_Balilty_test_female",config)
    
# I can do it also with Kobi_Balilty_test_male.to_sql("Kobi_Balilty_test_male", con=engine, index=False, if_exists='replace',method='multi',chunksize=1000)
# but it's slower

In [7]:
# task 3
def task_3(df):
    """split df to 10 datasets by gender age group of 10 (10s,20s 30s...)
            Parameters:
                df (DataFrame): dataframe to split
            Returns:
                df_by_group_age (dict): dictionary of DataFrames, with key of gender age group and value of DataFrame with all values that belong to the group.
                                        for exmple to see data that belong to group with 20s you need to: df_by_group_age[20]
    """
    cut_bins = [x for x in range(0,101,10)]
    cut_names = [str(x)+'s' for x in cut_bins[:-1]]
    df['group_age'] = pd.cut(df['dob_age'], bins=cut_bins, labels=cut_names, right=False)
    df_by_group_age = {i:y for i,(x, y) in enumerate(df.groupby('group_age'))}
    
    return df_by_group_age

In [8]:
# task 4
def task_4(df_by_group_age,config):
    """insert each gender age groups to tables in DB
            Parameters:
                df_by_group_age (dict): dictionary to split and insert each DataFrame to DB
                config (dict): connection dictionary containing user, password, host, database
    """
    for index, df in df_by_group_age.items():
        #df['group_age'] = df['group_age'].astype(object)
        df.drop('group_age',axis=1,inplace=True)
        create_table_and_insert_data(df, f"Kobi_Balilty_test_{index}", config)
        


In [9]:
def get_data(query,config):
    """execute query and get the data from DB
            Parameters:
                sql_query (str): the sql query to be executed as string
                config (dict): connection dictionary containing user, password, host, database

            Returns:
                DataFrame: the results returned in pandas dataframe. if no results empty df returned.
    """
    df = pd.DataFrame()
    connection = mysql.connector.connect(**config)
    cursor = connection.cursor(buffered=True)
    
    cursor.execute(query)
    
    if cursor.rowcount > 0:
        df = pd.DataFrame(data=cursor.fetchall(), columns=cursor.column_names)
    cursor.close()
    connection.close()
    
    return df

In [10]:
#task 5
def task_5():
    """get top 20 last registered males and females from DB
            Returns:
                DataFrame: the results returned in pandas dataframe 
    """
    query = """select * from(
        select *,
        dense_rank() over (
        order by `registered_date` desc
        )  as rank_id
        from interview.Kobi_Balilty_test_female) as t
        where rank_id <= 20
        union all 
        select * from(
        select *,
        dense_rank() over (
        order by `registered_date` desc
        )  as rank_id
        from interview.Kobi_Balilty_test_male) as t
        where rank_id <= 20"""
    Kobi_Balilty_test_20 = get_data(query, config)
    Kobi_Balilty_test_20.drop('rank_id',inplace=True,axis=1)
    create_table_and_insert_data(Kobi_Balilty_test_20, "Kobi_Balilty_test_20", config)
    
    return Kobi_Balilty_test_20


In [11]:
# task 6
def task_6(Kobi_Balilty_test_20,config):
    """get data from table Kobi_Balilty_test_5 and merge it with Kobi_Balilty_test_20
            Parameters:
                Kobi_Balilty_test_20 (DataFrame): dataframe with top 20 last registered males and females
                config (dict): connection dictionary containing user, password, host, database

            Returns:
                DataFrame: the results returned in pandas dataframe
                json: the results returned in json file
    """
    query = """select * from interview.Kobi_Balilty_test_5"""
    Kobi_Balilty_test_5 = get_data(query,config)
    first_json = Kobi_Balilty_test_5.merge(Kobi_Balilty_test_20, how='outer').to_json("first.json")
    task_6 = Kobi_Balilty_test_5.merge(Kobi_Balilty_test_20, how='outer')
    
    return first_json, task_6


In [12]:
# task 7
def task_7(Kobi_Balilty_test_20,config):
    """get data from table Kobi_Balilty_test_2 and concat it with Kobi_Balilty_test_20
            Parameters:
                Kobi_Balilty_test_20 (DataFrame): dataframe with top 20 last registered males and females
                config (dict): connection dictionary containing user, password, host, database

            Returns:
                DataFrame: the results returned in pandas dataframe
                json: the results returned in json file
    """
    query = """select * from interview.Kobi_Balilty_test_2"""
    Kobi_Balilty_test_2 = get_data(query,config)
    second_json = pd.concat([Kobi_Balilty_test_2, Kobi_Balilty_test_20],ignore_index=True).to_json("second.json")
    task_7 = pd.concat([Kobi_Balilty_test_2, Kobi_Balilty_test_20],ignore_index=True)
    return second_json, task_7



In [13]:
def main():
    status = 1
    try:
        df = get_api_data()
        task_2(df,config)
        df_by_group_age = task_3(df)
        task_4(df_by_group_age,config)
        Kobi_Balilty_test_20 = task_5()
        task_6(Kobi_Balilty_test_20,config)
        task_7(Kobi_Balilty_test_20,config)
    except Error as e:
        print("MYSql failed {}".format(e))
        status = 0
    except Exception as e:
        print("error {}".format(e))
        print("Exception occurred: {} - {}".format(e, traceback.format_exc()))
        status = 0
    finally:
        if status == 1:
            print("Program End")
        else:
            print("Program End With error")


    

In [14]:
main()

Program End
