In [1]:
import pprint
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyspark as ps
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

plt.style.use('ggplot')

spark = (ps.sql.SparkSession.builder
            .master('local[4]')
            .appName('sparkSQL')
            .getOrCreate())

sc = spark.sparkContext

In [9]:
df = spark.read.options(header='true', inferSchema='true', sep='\t', encoding='UTF-16') \
          .csv('/media/austin/Ubuntu-2/postgres_i502/raw_data/I502/Inventories_0.csv')

In [10]:
print(df.printSchema())

root
 |-- global_id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- mme_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- external_id: string (nullable = true)
 |-- area_id: string (nullable = true)
 |-- batch_id: string (nullable = true)
 |-- lab_result_id: string (nullable = true)
 |-- lab_retest_id: string (nullable = true)
 |-- is_initial_inventory: boolean (nullable = true)
 |-- inventory_created_at: timestamp (nullable = true)
 |-- inventory_packaged_at: timestamp (nullable = true)
 |-- created_by_mme_id: string (nullable = true)
 |-- qty: double (nullable = true)
 |-- uom: string (nullable = true)
 |-- strain_id: string (nullable = true)
 |-- inventory_type_id: string (nullable = true)
 |-- additives: string (nullable = true)
 |-- serving_num: integer (nullable = true)
 |-- sent_for_testing: boolean (nullable = true)
 |-- deleted_at: timestamp (nullable = true)
 |-- medically_complian

In [11]:
# remove "�" from dataset. Pandas read this in fine, but pyspark will not.
df = df.filter(~col(df.columns[0]).like("%�%"))
remove_last_char_in_str = udf(lambda x: x[:-1], StringType())
df = (df.withColumn(df.columns[-1], remove_last_char_in_str(df.columns[-1])));

In [12]:
df.createOrReplaceTempView('inventory')

In [13]:
df.first()

Row(global_id='WAJ412598.IN1', created_at=datetime.datetime(2018, 1, 31, 17, 24, 32), updated_at=datetime.datetime(2018, 2, 28, 1, 13, 28), mme_id='WAWA1.MMDJ', user_id='WAWA1.USAM', external_id=None, area_id='WAJ412598.AR3', batch_id='WAJ412598.BA1', lab_result_id=None, lab_retest_id=None, is_initial_inventory=False, inventory_created_at=datetime.datetime(1900, 1, 1, 0, 0), inventory_packaged_at=datetime.datetime(1900, 1, 1, 0, 0), created_by_mme_id=None, qty=0.0, uom='ea', strain_id='WAJ412598.STX', inventory_type_id='WAJ412598.TY1S', additives=None, serving_num=None, sent_for_testing=False, deleted_at=None, medically_compliant=None, legacy_id=None, lab_results_attested='0', lab_results_date=None, global_original_id='')

In [16]:
r = spark.sql('''SELECT COUNT(global_id)
                      FROM inventory
                      WHERE created_at > CAST("2019-12-31" AS DATE)
                         AND lab_result_id NOT LIKE "None"
                   ''')
r.show()

+----------------+
|count(global_id)|
+----------------+
|         2152180|
+----------------+



In [17]:
result = spark.sql('''SELECT global_id
                             , created_at
                             , mme_id
                             , user_id
                             , external_id
                             , batch_id
                             , lab_result_id
                             , created_by_mme_id
                             , strain_id
                             , inventory_type_id
                             , global_original_id
                      FROM inventory
                      WHERE created_at >= CAST("2020-01-01" AS DATE)
                         AND lab_result_id NOT LIKE "None"
                   ''')
result.show()

+-----------------+-------------------+-----------+-----------+--------------------+-----------------+-----------------+-----------------+----------------+-----------------+------------------+
|        global_id|         created_at|     mme_id|    user_id|         external_id|         batch_id|    lab_result_id|created_by_mme_id|       strain_id|inventory_type_id|global_original_id|
+-----------------+-------------------+-----------+-----------+--------------------+-----------------+-----------------+-----------------+----------------+-----------------+------------------+
|WAJ416392.INFOUAZ|2019-12-31 00:00:12| WAWA1.MMSQ|WAWA1.US1YB|                null|WAJ416392.BAFTDIO|     WAL4.LR134NZ|             null|WAJ416392.ST1HQL|WAJ416392.TYDE1II|                  |
|WAJ417152.INFOUB1|2019-12-31 00:00:23|WAWA1.MM12O|WAWA1.US123|                null|WAJ417152.BAFTDJE|     WAL9.LR13GP8|             null|WAJ417152.ST1NYS|WAJ417152.TY8YXRZ|                  |
|WAJ417152.INFOUB2|2019-12-31 00:00

In [33]:
for month in range(1,13):
    r = spark.sql(f'''SELECT global_id
                             , created_at
                             , mme_id
                             , user_id
                             , external_id
                             , batch_id
                             , lab_result_id
                             , created_by_mme_id
                             , strain_id
                             , inventory_type_id
                             , global_original_id
                      FROM inventory
                      WHERE created_at >= CAST("2020-{month}-01" AS DATE)
                         AND lab_result_id NOT LIKE "None"
    ''')
    r.coalesce(1).write.csv(f'inventory_month_{month}.csv', header='true')

In [35]:
df_2 = pd.read_csv('/home/austin/code/dsi/capstone_1/inventory_month_1.csv/part-00000-3d7a9777-9ab2-4297-8f76-d19a7794b661-c000.csv')

In [36]:
df_2.head()

Unnamed: 0,global_id,created_at,mme_id,user_id,external_id,batch_id,lab_result_id,created_by_mme_id,strain_id,inventory_type_id,global_original_id
0,WAJ412217.INFPLZB,2020-01-01T00:04:00.000-08:00,WAWA1.MMG,WAWA1.US6I,f4434b94-f35e-4e10-bead-48a48c3f15d6,WAJ412217.BAFU05U,WAATTESTE.LR7HLB,WAWA1.MMJ9,WAJ412217.STNDGY,WAJ412217.TYDLBH3,WAR415333.INFH2S6
1,WAR424751.INFPLZE,2020-01-01T00:04:39.000-08:00,WAWA1.MMB8,WAWA1.US91,gf:qsEfj-AQseoGRSPMr,WAR424751.BAFU05V,WAL8.LR12WHA,WAWA1.MMX8,,WAR424751.TYBF0JR,WAJ414289.INFL5R7
2,WAR424751.INFPLZF,2020-01-01T00:04:39.000-08:00,WAWA1.MMB8,WAWA1.US91,gf:epnYz_MrsKWS_NjYr,WAR424751.BAFU05W,WAL8.LR131KT,WAWA1.MMX8,,WAR424751.TYBF0JR,WAJ414289.INFL5R8
3,WAR424751.INFPLZG,2020-01-01T00:04:39.000-08:00,WAWA1.MMB8,WAWA1.US91,gf:TQOlKrLDEUCkPoVGP,WAR424751.BAFU05X,WAL8.LR12PG7,WAWA1.MMX8,,WAR424751.TYBF0JS,WAJ414289.INFL5R6
4,WAR424751.INFPLZH,2020-01-01T00:04:39.000-08:00,WAWA1.MMB8,WAWA1.US91,gf:Ady_cshdypUYFwOpk,WAR424751.BAFU05Y,WAL8.LR131KX,WAWA1.MMX8,,WAR424751.TYBF0JR,WAJ414289.INFL9BP


In [40]:
for col in df_2.columns:
    m = max(df_2[col].value_counts())
    print(col, m)

global_id 2
created_at 40
mme_id 468229
user_id 560079
external_id 1548
batch_id 4450
lab_result_id 817271
created_by_mme_id 420441
strain_id 152064
inventory_type_id 14802
global_original_id 2771
