In [5]:
! pip install pyspark boto3

Collecting boto3
  Downloading boto3-1.34.2-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
Collecting botocore<1.35.0,>=1.34.2 (from boto3)
  Downloading botocore-1.34.2-py3-none-any.whl (11.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.8/11.8 MB[0m [31m27.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.10.0,>=0.9.0 (from boto3)
  Downloading s3transfer-0.9.0-py3-none-any.whl (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.0/82.0 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.34.2 botocore-1.34.2 jmespath-1.0.1 s3transfer-0.9.0


In [6]:
import pandas as pd
import pyspark
import boto3

In [7]:
from pyspark.sql import SparkSession

In [8]:
s3 = boto3.resource(
    service_name = 's3',
    region_name = 'us-east-2',
    aws_access_key_id = '',
    aws_secret_access_key = ''
)

In [18]:
obj = s3.Bucket('dsc-bucket-1').Object('raw/2023-12-12_open_fda_raw.csv').get()
pd_df = pd.read_csv(obj['Body'],index_col=0)

In [19]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [20]:
spark

In [21]:
#df = spark.read.option('header','true').csv('/content/2023-12-12_open_fda_raw.csv')
df = spark.createDataFrame(pd_df)

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import re
import random

# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Define a UDF (User Defined Function) to extract numeric values from text
def extract_numeric_udf(text):
    if isinstance(text, str):
        # Use regular expression to find numeric values (including commas and dots)
        numeric_values = re.findall(r'\d[\d,]*(?:\.\d+)?', text)

        # Convert the extracted numbers to floats (considering commas)
        numeric_values = [float(value.replace(',', '')) for value in numeric_values]

        # Sum all numeric values
        total = sum(numeric_values)

        return total
    else:
        return None  # Return None for non-string or None values

# Register the UDF with Spark
extract_numeric_spark_udf = udf(extract_numeric_udf, FloatType())

# Apply the UDF to the 'product_quantity' column and create a new 'product_quantity_cleaned' column
df = df.withColumn("product_quantity_cleaned", extract_numeric_spark_udf(df["product_quantity"]))

def cost_udf(val):
    if val < 1000:
        return val * random.randint(800, 1000)
    elif val < 10000:
        return val * random.randint(200, 500)
    elif 10000 < val < 100000:
        return val * random.randint(10, 20)
    elif val >= 100000:
        return val * random.randint(1, 5) * 0.01
    else:
        return None

# Register the UDF with Spark
cost_spark_udf = udf(cost_udf, FloatType())

# Apply the UDF to the 'product_quantity_cleaned' column and create a new 'cost_to_recall_in_dollars' column
df = df.withColumn("cost_to_recall_in_dollars", cost_spark_udf(df["product_quantity_cleaned"]))

pd_df_transformed = df.toPandas()

# Show the result
# df.write.mode('overwrite').csv(S3_DATA_OUTPUT_PATH)


In [24]:
pd_df_transformed

Unnamed: 0,recall_status,recalling_firm,address_1,address_2,city,state,postal_code,product_code,root_cause_description,product_quantity,device_class,product_quantity_cleaned,cost_to_recall_in_dollars
0,Terminated,Bio-Logic Systems Corp,1 Bio Logic Plaza,,Mundelein,IL,60060-3708,GWQ,Device Design,311 units,2,311.0,2.780340e+05
1,Terminated,Philips Medical Systems North America Co. Phil...,22100 Bothell Everett Hwy,,Bothell,WA,98021-8431,JAA,Software design,19 units in the US.,2,19.0,1.639700e+04
2,Terminated,Penumbra Inc.,1351 Harbor Bay Pkwy,,Alameda,CA,94502-6541,DQY,Device Design,2246 units,2,2246.0,1.111770e+06
3,Terminated,Baxter Healthcare Corp. Rt.,120 & Wilson Rd,,Round Lake,IL,60073,FRN,Device Design,"221,097 pumps",2,221097.0,2.210970e+03
4,Terminated,"Medtronic Navigation, Inc",826 Coal Creek Circle,,Louisville,CO,80027-9710,HAW,Labeling design,6 probes,2,6.0,4.980000e+03
...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,Terminated,"Deerfield Imaging, Inc.",5101 Shady Oak Rd S,,Minnetonka,MN,55343-4100,HBL,Device Design,3 units,2,3.0,2.469000e+03
9996,Terminated,"Siemens Medical Solutions USA, Inc",4040 Nelson Ave,,Concord,CA,94520-1200,IYE,Software design,52 units,2,52.0,4.815200e+04
9997,Terminated,Varian Medical Systems Oncology Systems,911 Hansen Way,,Palo Alto,CA,94304-1028,IYE,Software design,18,2,18.0,1.652400e+04
9998,Terminated,Siemens Medical Solutions USA Inc.,810 Innovation Dr,,Knoxville,TN,37932-2562,KPS,Device Design,2 units,2,2.0,1.778000e+03


In [25]:
pd_df_transformed.to_csv('open_fda_transformed.csv')

In [26]:
s3.Bucket('dsc-bucket-1').upload_file(Filename='open_fda_transformed.csv', Key='open_fda_transformed.csv')