In [1]:
from pathlib import Path
from datetime import date, datetime, timedelta
import csv
import json
import configparser

import requests
import psycopg2
from psycopg2 import sql

import numpy as np
import pandas as pd

In [2]:
DATA_PATH = Path('../data')
CONFIG_PATH = Path('../config')

In [3]:
config = configparser.ConfigParser()
config.read(CONFIG_PATH/'test_sources.ini')

['../config/test_sources.ini']

In [4]:
CSV_URL = config['DEFAULT']['CSV_URL']
JSON_URL = config['DEFAULT']['JSON_URL']
SHARED_DB_CON = config['DEFAULT']['SHARED_DB_CON']
PRIVATE_DB_CON = config['DEFAULT']['PRIVATE_DB_CON']
SHARED_DSN = config['DEFAULT']['SHARED_DSN']
PRIVATE_DSN = config['DEFAULT']['PRIVATE_DSN']

**csv**

In [5]:
def download_file(url, dir_path):
    file_name = url.split('/')[-1]
    file_path = dir_path/file_name

    with requests.get(url) as r:
        r.raise_for_status()
        
        with open(file_path, 'w') as f:
            f.write(r.content.decode('utf-8'))
            
    return file_path

orders_file_path = download_file(CSV_URL, DATA_PATH)
orders_file_path

PosixPath('../data/orders.csv')

In [6]:
!head -2 {orders_file_path}

id заказа,uuid заказа,название товара,дата заказа,количество,ФИО,email
76921,7d9a28f7-418b-4e8a-9ab3-68bf5deba724,Банк триста плод сынок неудобно поезд неожиданный.,2020-05-15 8:31:05,3,Елизавета Семеновна Юдина,belovaanna@mail.ru


In [79]:
def clean_field(field):
    return field.strip()

In [88]:
def clean_orders_data(file_path):
    clean_file_path = file_path.parent/f'{file_path.stem}_clean.csv'
    field_names = ('order_id', 'order_uuid', 'good_title', 
                   'date', 'amount', 'name', 'email')
    selected_field_names = ('order_uuid', 'good_title', 'date', 
                            'amount', 'name', 'email')

    with open(file_path) as f:
        next(f)
        reader = csv.DictReader(f, fieldnames=field_names)

        with open(clean_file_path, 'w') as f_clean:
            writer = csv.DictWriter(f_clean, 
                                    fieldnames=selected_field_names, 
                                    extrasaction='ignore')
            writer.writeheader()
            
            for line in reader:
                line = {key: clean_field(line[key]) for key in line}
#                 print(line)
                writer.writerow(line)
                
    return clean_file_path

clean_orders_file_path = clean_orders_data(orders_file_path)
clean_orders_file_path




PosixPath('../data/orders_clean.csv')

In [83]:
!head -5 {clean_orders_file_path}

order_uuid,good_title,date,amount,name,email
7d9a28f7-418b-4e8a-9ab3-68bf5deba724,Банк триста плод сынок неудобно поезд неожиданный.,2020-05-15 8:31:05,3,Елизавета Семеновна Юдина,belovaanna@mail.ru
f70a0b8c-f6cd-4876-b21b-33ee76d2d993,Пропадать беспомощный равнодушный.,2020-06-01 3:08:10,5,Ершов Милий Григорьевич,doroninleon@rao.com
5507582b-992f-4eb0-97e6-6d4b5d66b06c,Что мягкий роса научить необычный домашний командование настать.,2020-05-12 21:50:55,4,Маслов Твердислав Фадеевич,doroninatatjana@rambler.ru
c48a1e2e-06cf-4cb5-b697-0c695227b450,Товар назначить медицина секунда увеличиваться.,2020-05-29 7:17:52,3,Анисим Харитонович Лазарев,komarovepifan@yahoo.com


**json**

In [9]:
status_file_path = download_file(JSON_URL, DATA_PATH)
status_file_path

PosixPath('../data/5ed7391379382f568bd22822')

In [10]:
def clean_status_data(file_path):
    with open(file_path) as f:
        status_data = json.loads(f.read())

    clean_file_path = file_path.parent/f'{file_path.stem}_clean.csv'
    selected_field_names = ('order_uuid', 'payment_status')

    with open(clean_file_path, 'w') as clean_f:
        writer = csv.DictWriter(clean_f, fieldnames=selected_field_names)

        writer.writeheader() 

        for order_uuid, status in status_data.items():
            payment_status = 'success' if status['success'] is True else 'failure'

            status_row = {
                'order_uuid': order_uuid,
                'payment_status': payment_status
            }

            writer.writerow(status_row)
            
    return clean_file_path
            
clean_status_file_path = clean_status_data(status_file_path)
clean_status_file_path

PosixPath('../data/5ed7391379382f568bd22822_clean.csv')

In [11]:
!head -2 {status_clean_file_path}

head: cannot open '{status_clean_file_path}' for reading: No such file or directory


**Shared DB**

In [12]:
def get_table_data(dsn, table_name, data_path):
    file_path = data_path/f'{table_name}.csv'
    
    conn = psycopg2.connect(dsn)
    cur = conn.cursor()
    
    table_id = sql.Identifier(table_name)
    sql_query = sql.SQL("COPY (SELECT * FROM {}) TO STDOUT WITH CSV HEADER").format(table_id)

    with open(file_path, 'w') as f:
        cur.copy_expert(sql_query, f)
        
    cur.close()
    conn.close()
    
    return file_path

In [13]:
customers_file_path = get_table_data(SHARED_DSN, 'customers', DATA_PATH)
customers_file_path

PosixPath('../data/customers.csv')

In [14]:
!head -2 {customers_file_path}

id,name,birth_date,gender,email
97766,Нинель Васильевна Носова ,1975-07-19,F,belovaanna@mail.ru


In [15]:
def calculate_age(birth_date, datetime_format):
    DAYS_IN_YEAR = 365.25
    age = (datetime.now() - datetime.strptime(birth_date, datetime_format)) // timedelta(days=DAYS_IN_YEAR)
    return age

datetime_format = '%Y-%m-%d %H:%M:%S'
calculate_age('2020-06-01 3:08:10', datetime_format)

0

In [16]:
def clean_customers_data(file_path):
    clean_file_path = file_path.parent/f'{file_path.stem}_clean.csv'
    field_names = ('id', 'name', 'birth_date', 'gender', 'email')
    selected_field_names = ('email', 'age')
    datetime_format = '%Y-%m-%d'

    with open(file_path) as f:
        next(f)
        reader = csv.DictReader(f, fieldnames=field_names)

        with open(clean_file_path, 'w') as f_clean:
            writer = csv.DictWriter(f_clean, 
                                    fieldnames=selected_field_names, 
                                    extrasaction='ignore')
            writer.writeheader()
            
            for line in reader:
                line['age'] = calculate_age(line['birth_date'], datetime_format)
                writer.writerow(line)
                
    return clean_file_path

clean_customers_file_path = clean_customers_data(customers_file_path)
clean_customers_file_path

PosixPath('../data/customers_clean.csv')

In [17]:
!head -2 {clean_customers_file_path}

email,age
belovaanna@mail.ru,44


In [18]:
goods_file_path = get_table_data(SHARED_DSN, 'goods', DATA_PATH)
goods_file_path

PosixPath('../data/goods.csv')

In [19]:
!head -2 {goods_file_path}

id,name,price
25389,Банк триста плод сынок неудобно поезд неожиданный.,1835.98


In [20]:
def clean_goods_data(file_path):
    clean_file_path = file_path.parent/f'{file_path.stem}_clean.csv'
    field_names = ('id', 'good_title', 'price')
    selected_field_names = ('good_title', 'price')
    datetime_format = '%Y-%m-%d'

    with open(file_path) as f:
        next(f)
        reader = csv.DictReader(f, fieldnames=field_names)

        with open(clean_file_path, 'w') as f_clean:
            writer = csv.DictWriter(f_clean, 
                                    fieldnames=selected_field_names, 
                                    extrasaction='ignore')
            writer.writeheader()
            
            for line in reader:
                writer.writerow(line)
                
    return clean_file_path

clean_goods_file_path = clean_goods_data(goods_file_path)
clean_goods_file_path

PosixPath('../data/goods_clean.csv')

In [21]:
!head -2 {clean_goods_file_path}

good_title,price
Банк триста плод сынок неудобно поезд неожиданный.,1835.98


**Save to temp tables in Private DB**

In [22]:
def save_table_data(dsn, table_name, file_path):
    conn = psycopg2.connect(dsn)
    cur = conn.cursor()
    
    with open(file_path, 'r') as f:
        columns = next(f).split(',')
        cur.copy_from(f, table_name, sep=',', columns=columns)

    conn.commit()
        
    cur.close()
    conn.close()
        
    return

In [23]:
save_table_data(PRIVATE_DSN, 'orders_tmp', clean_orders_file_path)
save_table_data(PRIVATE_DSN, 'status_tmp', clean_status_file_path)
save_table_data(PRIVATE_DSN, 'customers_tmp', clean_customers_file_path)
save_table_data(PRIVATE_DSN, 'goods_tmp', clean_goods_file_path)