# Set up PySpark

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, to_date
spark = SparkSession.builder.getOrCreate()

In [None]:
# Enforce data types
pets_schema = StructType([
    StructField("PetID", StringType()),
    StructField("Name", StringType()),
    StructField("Kind", StringType()),
    StructField("Gender", StringType()),
    StructField("Age", IntegerType()),
    StructField("OwnerID", StringType())])

owners_schema = StructType([
    StructField("OwnerID", StringType()),
    StructField("Name", StringType()),
    StructField("Surname", StringType()),
    StructField("StreetAddress", StringType()),
    StructField("City", StringType()),
    StructField("State", StringType()),
    StructField("StateFull", StringType()),
    StructField("ZipCode", StringType())])

proceduresdetails_schema = StructType([
    StructField("ProcedureType", StringType()),
    StructField("ProcedureSubCode", StringType()),
    StructField("Description", StringType()),
    StructField("Price", DoubleType())])

procedureshistory_schema = StructType([
    StructField("PetID", StringType()),
    StructField("ProcedureDate", StringType()),
    StructField("ProcedureType", StringType()),
    StructField("ProcedureSubCode", StringType())])

In [None]:
# Create Spark DataFrames
sp_pets = spark.read.csv('data/Pets.csv', header=True, schema=pets_schema)
sp_owners = spark.read.csv('data/Owners.csv', header=True, schema=owners_schema)
sp_proceduresdetails = spark.read.csv('data/ProceduresDetails.csv', header=True, schema=proceduresdetails_schema)
sp_procedureshistory = spark.read.csv('data/ProceduresHistory.csv', header=True, schema=procedureshistory_schema)

# Change data type of the ProcedureDate as it couldn't be done in the schema options
sp_procedureshistory = sp_procedureshistory.withColumn('ProcedureDate', 
                   to_date(col('ProcedureDate'), 'yyyy/MM/dd'))

In [None]:
# Change the column names to lowercase
def col_to_lowercase(df):
    for col in df.columns:
        new_col = col.lower()
        df = df.withColumnRenamed(col, new_col)
    return df
sp_pets = col_to_lowercase(sp_pets)
sp_owners = col_to_lowercase(sp_owners)
sp_proceduresdetails = col_to_lowercase(sp_proceduresdetails)
sp_procedureshistory = col_to_lowercase(sp_procedureshistory)

# Set up PostgreSQL

In [None]:
import psycopg2
import pandas as pd
from db_creds import creds

In [None]:
class DataBase:
    def __init__(self, host, dbname, username, password, port):
        self.host = host 
        self.port = port 
        self.dbname = dbname 
        self.username = username 
        self.password = password 
    
    def __connect__(self):
        """Opens connector class and initiates cursor"""
        self.con = psycopg2.connect(host=self.host, port=self.port, user=self.username, password=self.password, 
                                                 database=self.dbname) 
        self.cur = self.con.cursor()

    def __disconnect__(self):
        """Commits any changes to the database and closes connection"""
        self.con.commit()
        self.con.close()
        
    def conn(self):
        self.con = psycopg2.connect(host=self.host, port=self.port, user=self.username, password=self.password, 
                                                 database=self.dbname) 
        return self.con

    def fetch(self, sql, variables=None):
        """Connects to database, fetches data specific to sql query, then disconnects from database"""
        self.__connect__()
        try:
            self.cur.execute(sql, variables)
            result = self.cur.fetchall()
            return result
        except Exception as e:
            print (e)
        finally:
            self.__disconnect__()
        

    def execute(self, sql, variables=None):
        """Connects to database, executes sql query, along with any variables, then disconnects from database"""
        self.__connect__()
        try:
            self.cur.execute(sql, variables)
        except Exception as e:
            print (e)
        finally:
            self.__disconnect__()
            
    def get_cols(self, table, details='no'):
        data = self.fetch("""
                SELECT *
          FROM information_schema.columns
        where table_schema = 'public'
             ;
                """)
        cols = []
        if details == 'yes':
            for i in data:
                if i[2:][0] == table:
                    print (i[2:][1], i[2:][5], i[2:][6])
        else:
            for i in data:
                if i[2:][0] == table:
                    cols.append(i[2:][1])
            str_cols = ', '.join(cols)
            return str_cols
    
    def get_tables(self):
        return self.fetch('''
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'public'
            ORDER BY table_name;
            ''')

In [None]:
# Create database connection
username = creds['username']
password = creds['password']
host = creds['host']
dbname = creds['dbname']
port = creds['port']
db = DataBase(host, dbname, username, password, port)

In [None]:
# Create pets table
db.execute('''
    CREATE TABLE IF NOT EXISTS pets (
    petid varchar,
    name varchar,
    kind varchar,
    gender varchar,
    age int,
    ownerid varchar
)
''')

In [None]:
# Create owners owners
db.execute('''
CREATE TABLE IF NOT EXISTS owners (
    ownerid varchar,
    name varchar,
    surname varchar,
    streetaddress varchar,
    city varchar,
    state varchar(2),
    statefull varchar,
    zipcode varchar
)
''')

In [None]:
# Create owners proceduredetails
db.execute('''
CREATE TABLE IF NOT EXISTS proceduresdetails (
    proceduretype varchar,
    proceduresubcode varchar,
    description varchar,
    price float
)
''')

In [None]:
# Create owners prodecurehistory
db.execute('''
CREATE TABLE IF NOT EXISTS procedureshistory (
    petid varchar,
    proceduredate date,
    proceduretype varchar,
    proceduresubcode varchar
)
''')

In [None]:
# Check tables have been created
db.get_tables()

## With COPY statement if you have superuser rights on Postgres

In [None]:
# Copy pets data
db.execute('''
    COPY pets FROM 'data/Pets.csv' DELIMITER ',' CSV HEADER
''')

In [None]:
# Copy owners data
db.execute('''
    COPY owners FROM 'data/Owners.csv' DELIMITER ',' CSV HEADER
''')

In [None]:
# Copy proceduredetails data
db.execute('''
    COPY proceduresdetails FROM 'data/ProceduresDetails.csv' DELIMITER ',' CSV HEADER
''')

In [None]:
# Copy procedurehistory data
db.execute('''
    COPY procedureshistory FROM 'data/ProceduresHistory.csv' DELIMITER ',' CSV HEADER
''')

## With pandas.to_csv() 

An alternative if you do not have superuser access to your Postgres database

In [None]:
from sqlalchemy import create_engine
engine = create_engine(f'postgresql+psycopg2://{username}:{password}@{host}:{port}/{dbname}')

In [13]:
# Create pandas DataFrames
pets = pd.read_csv('data/Pets.csv')#, dtype={'OwnerID':'object'})
owners = pd.read_csv('data/Owners.csv')
proceduresdetails = pd.read_csv('data/ProceduresDetails.csv')
procedureshistory = pd.read_csv('data/ProceduresHistory.csv')

In [14]:
# Rename the columns to lower case to match database column names
pets = pets.rename(str.lower, axis='columns')
owners = owners.rename(str.lower, axis='columns')
proceduresdetails = proceduresdetails.rename(str.lower, axis='columns')
procedureshistory = procedureshistory.rename(str.lower, axis='columns')

In [None]:
# Copy data to database
pets.to_sql('pets', engine, if_exists='append', index=False)
owners.to_sql('owners', engine, if_exists='append', index=False)
proceduresdetails.to_sql('proceduresdetails', engine, if_exists='append', index=False)
procedureshistory.to_sql('procedureshistory', engine, if_exists='append', index=False)

In [18]:
# Verify # of records in each table
tables = [['pets', pets],['owners', owners],['proceduresdetails', proceduresdetails],['procedureshistory', procedureshistory]]
for table in tables:
    table_name = table[0]
    db_count = db.fetch(f'''
        SELECT COUNT(*) FROM {table_name}
    ''')[0][0]
    if db_count == len(table[1]):
        print (f'{table_name} table verified.')
    else:
        print (f'{table_name} Error: {db_count} records in SQL table & {len(table[1])} records in Pandas dataframe.')

pets table verified.
owners table verified.
proceduresdetails table verified.
procedureshistory table verified.
