In [None]:
import sys
from itertools import cycle
import pdfplumber as pp
import boto3
import pandas as pd
import s3fs
import os
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from datetime import datetime, timedelta
import awswrangler as wr

# Initialize job parameters
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Assign bucket names.
source_bucket = 'cannaeyeglass-datalake-processed-dev'
dest_bucket = 'cannaeyeglass-datalake-enriched-dev'
#error_bucket = 'cannaeyeglass-datalake-errors'

def remove_newline(x):
    try:
        x = x.str.replace('\n','').replace(' \n','').replace('\n ','')
    except:
        pass
    return x

def csvToCombinedcsv(dest_bucket, destination_file, source_bucket, file_path,license_type):
    s3 = s3fs.S3FileSystem(anon=False)
    raw_df = wr.s3.read_csv(path=f's3://{source_bucket}/{file_path}/', path_suffix = ['.csv'], dataset=True)
    raw_df =raw_df.drop(['Unnamed: 0'],axis=1)
    raw_df=raw_df[raw_df['0'].notnull()]
    enrich_df= raw_df.iloc[:,:7]
    enrich_df.columns=['Name','Lincence_number','Email','Phone','City','Zip','County']
    enrich_df.replace(to_replace=[r"\\t|\\n|\\r", "\t|\n|\r"], value=["",""], regex=True, inplace=True )
    #enrich_df.apply(remove_newline)
    enrich_df['license_type']=license_type
    with s3.open(f's3://{dest_bucket}/{destination_file}/','w') as f:
        enrich_df.to_csv(f)
# Calling the function with latest available files
date = (datetime.today() - timedelta(0)).strftime('%Y%m%d')
year = (datetime.today() - timedelta(0)).strftime('%Y-%m-%d').split('-')[0]
month = (datetime.today() - timedelta(0)).strftime('%Y-%m-%d').split('-')[1]
day=(datetime.today() - timedelta(0)).strftime('%Y-%m-%d').split('-')[2]
filenames = ['growers','processor','transporter','dispensaries','laboratory','waste_disposal']
#filenames = ['waste_disposal']
for file in filenames:
    raw_path_dir = 'US/OK/CannabisLifecycle/' + file + '/'  + year + '/' + month + '/' + day
    dest_filename = 'US/OK/LicenceInfo/' + year + '/' + month + '/' + day + '/' + file +'_'+ date
    combained='US/OK/LicenseInfo/' + year + '/' + month + '/' + day + '/' + file + date+'.csv'
    csvToCombinedcsv(dest_bucket, combained, source_bucket, raw_path_dir,file)
job.commit()