In [1]:
!pip install --upgrade google-cloud-storage



In [None]:
import os
from google.cloud import storage
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/content/southern-gasket-395903-48415c815598.json'


In [None]:
storage_client = storage.Client()
dir(storage_client)

In [None]:
#Create new bucket
bucket_name = "aivn_testing_bucket"
bucket = storage_client.bucket(bucket_name)
bucket.location = "US"
bucket = storage_client.create_bucket(bucket)

In [None]:
#access bucket
my_bucket = storage_client.get_bucket("aivndemobigdata")

In [None]:
#download file
def download_file_from_bucket(blob_name, file_path, bucket_name):
  try:
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    with open(file_path, "wb") as f:
      storage_client.download_blob_to_file(blob,f)
    return True
  except Exception as e:
    print(e)
    return False


In [None]:
download_file_from_bucket("asteroid/advertising.csv", "testing.csv", "aivndemobigdata")

True

In [None]:
#upload file
def upload_to_bucket(blob_name, file_path, bucket_name):
  try:
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.upload_from_filename(file_path)
    return True
  except Exception as e:
    print(e)
  return False



In [None]:
upload_to_bucket("asteroid/Movies.csv","/content/BostonHousing.csv", "aivndemobigdata")

True

In [None]:
bucket = storage_client.get_bucket("aivndemobigdata")
blobs = bucket.list_blobs()
for blob in blobs:
    print(blob.name)

asteroid/
asteroid/Movies.csv
asteroid/advertising.csv


In [None]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .config("spark.jars", "/content/drive/MyDrive/AI2023/gcs-connector-hadoop3-latest.jar") \
        .getOrCreate()

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true,
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "/content/southern-gasket-395903-48415c815598.json")


In [None]:
input_dir = "gs://aivndemobigdata/asteroid/"
df = spark.read.format("com.databricks.spark.csv").options(header="true", inferschema="true").load(input_dir+"advertising.csv")
df.head()

Row(TV=230.1, Radio=37.8, Newspaper=69.2, Sales=22.1)

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
pd.set_option('display.max_columns', None)

In [None]:
!pip install gcsfs



In [None]:
import pandas as pd
df = pd.read_csv('gcs://aivndemobigdata/asteroid/advertising.csv',
                 storage_options={"token": "/content/southern-gasket-395903-48415c815598.json"})
print(df.head())

      TV  Radio  Newspaper  Sales
0  230.1   37.8       69.2   22.1
1   44.5   39.3       45.1   10.4
2   17.2   45.9       69.3   12.0
3  151.5   41.3       58.5   16.5
4  180.8   10.8       58.4   17.9


In [None]:
import pandas as pd
dataset = pd.read_csv('gcs://aivnbigdata/asteroid/Movies.csv',
                 storage_options={"token": "/content/angular-yeti-389702-d5b06204e451.json"})
print(df.head())

### Big data: Small Body Dataset

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import regexp_replace

pd.set_option('display.max_columns', None)

In [None]:
bucket_name="aivnbigdata"
path=f"gs://{bucket_name}/asteroid/Asteroid_Updated.csv"

df=spark.read.csv(path, sep=',', inferSchema=True, header=True)

In [None]:
def preview(df, n=20):
    return pd.DataFrame(df.take(n), columns=df.columns)

preview(df)

In [None]:
df.createOrReplaceTempView("df")

query = """
          SELECT class, diameter, name
          FROM df
          WHERE diameter > 500
        """

spark.sql(query).show()

In [None]:


astro_df = df.dropna(subset=['diameter'])

astro_df = astro_df.drop('extent', 'rot_per', 'GM', 'BV', 'UB', 'IR', 'spec_B', 'spec_T', 'G', 'data_arc', 'H', 'albedo')

astro_df = astro_df.withColumn('neo', regexp_replace('neo', 'Y', 'True'))
astro_df = astro_df.withColumn('neo', regexp_replace('neo', 'N', 'False'))
astro_df = astro_df.withColumn('pha', regexp_replace('pha', 'Y', 'True'))
astro_df = astro_df.withColumn('pha', regexp_replace('pha', 'N', 'False'))
astro_df = astro_df.withColumn('n_obs_used', astro_df['n_obs_used'].cast('double'))
astro_df = astro_df.withColumn('diameter', astro_df['diameter'].cast('double'))

for column in ['neo', 'pha']:
    astro_df = astro_df.withColumn(column, astro_df[column].cast('boolean').cast('int'))

astro_df = astro_df.dropna(subset=['diameter'])

In [None]:
condition_code_indexer = StringIndexer(inputCol="condition_code", outputCol="condition_codeIndex")
class_indexer = StringIndexer(inputCol="class", outputCol="classIndex")
onehotencoder_condition_code_vector = OneHotEncoder(inputCol="condition_codeIndex", outputCol="condition_code_vec")
onehotencoder_class_vector = OneHotEncoder(inputCol="classIndex", outputCol="class_vec")


encoding_pipeline = Pipeline(stages=[condition_code_indexer,
                            class_indexer,
                            onehotencoder_condition_code_vector,
                            onehotencoder_class_vector
                    ])

astro_df = encoding_pipeline.fit(astro_df).transform(astro_df)

In [None]:
astro_df = astro_df.drop('condition_code', 'class', 'condition_codeIndex', 'classIndex')

astro_df = astro_df.select('name', "a","e","i",'om','w','q','ad', 'per_y', 'n_obs_used', 'neo', 'pha', 'moid', 'n', 'per', 'ma', 'condition_code_vec', 'class_vec', 'diameter')

features = astro_df.schema.names[1:-1]

In [None]:
train, val, test = astro_df.randomSplit([0.6, 0.2, 0.2], seed=42)

train_df = train.drop('name')
val_df = val.drop('name')
test_df = test.drop('name')

In [None]:
assembler = VectorAssembler(inputCols=features, outputCol='features')

test_pack = assembler.transform(test_df)
train_pack = assembler.transform(train_df)
val_pack = assembler.transform(val_df)

for field in features:
    test_pack = test_pack.drop(field)
    train_pack = train_pack.drop(field)
    val_pack = val_pack.drop(field)

In [None]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(train_pack)

train_pack = scaler_model.transform(train_pack)
val_pack = scaler_model.transform(val_pack)
test_pack = scaler_model.transform(test_pack)

In [None]:
gbt = GBTRegressor(featuresCol='features', labelCol='diameter', maxIter=100, maxDepth=5, seed=42, lossType='squared', stepSize=.1)

gbt_model = gbt.fit(train_pack)
gbt_pred = gbt_model.transform(val_pack)

rf = RandomForestRegressor(featuresCol='features', labelCol='diameter', maxDepth=5, seed=42, bootstrap=True, numTrees=100)

rf_model = rf.fit(train_pack)
rf_pred = rf_model.transform(val_pack)

lr = LinearRegression(featuresCol='features', labelCol='diameter', maxIter=100, loss='squaredError', elasticNetParam=0.5, regParam=0.1, fitIntercept=True, standardization=True, solver='auto', tol=.1)

lr_model = lr.fit(train_pack)
lr_pred = lr_model.transform(val_pack)

In [None]:
rmse = RegressionEvaluator(
    labelCol="diameter", predictionCol="prediction", metricName="rmse")

r2 = RegressionEvaluator(
    labelCol="diameter", predictionCol="prediction", metricName="r2")

metrics = [rmse, r2]
metric_labels = ['rmse', 'r2']

predictions = [lr_pred, rf_pred, gbt_pred]
predict_labels = ['LR', 'RF', 'GBT']

eval_list = list()

for pred in zip(predict_labels, predictions):
    name = pred[0]
    predict = pred[1]

    metric_vals = pd.Series(dict([(x[0], x[1].evaluate(predict))
                                 for x in zip(metric_labels, metrics)]),
                            name=name)
    eval_list.append(metric_vals)

eval_df = pd.concat(eval_list, axis=1).T
eval_df = eval_df[metric_labels]
eval_df

### Pyspark + SQL

In [None]:
# Pandas: read CSV file into table
df = spark.read.option("header",True).csv("/content/zipcodes.csv")
df.printSchema()
df.show()

root
 |-- RecordNumber: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- State: string (nullable = true)

+------------+-------+-------------------+-------+-----+
|RecordNumber|Country|               City|Zipcode|State|
+------------+-------+-------------------+-------+-----+
|           1|     US|        PARC PARQUE|    704|   PR|
|           2|     US|PASEO COSTA DEL SUR|    704|   PR|
|          10|     US|       BDA SAN LUIS|    709|   PR|
|       49347|     US|               HOLT|  32564|   FL|
|       49348|     US|          HOMOSASSA|  34487|   FL|
|       61391|     US|  CINGULAR WIRELESS|  76166|   TX|
|       61392|     US|         FORT WORTH|  76177|   TX|
|       61393|     US|           FT WORTH|  76177|   TX|
|       54356|     US|        SPRUCE PINE|  35585|   AL|
|       76511|     US|           ASH HILL|  27007|   NC|
|           4|     US|    URB EUGENE RICE|    704|   PR|
|

In [None]:
# Spark: Read CSV file
bucket_name="aivndemobigdata"
path=f"gs://{bucket_name}/asteroid/zipcodes.csv"

spark.read.option("header",True).csv(path).createOrReplaceTempView("Zipcodes")

In [None]:
# DataFrame API Select query
df.select("country","city","zipcode","state").show(5)

+-------+-------------------+-------+-----+
|country|               city|zipcode|state|
+-------+-------------------+-------+-----+
|     US|        PARC PARQUE|    704|   PR|
|     US|PASEO COSTA DEL SUR|    704|   PR|
|     US|       BDA SAN LUIS|    709|   PR|
|     US|               HOLT|  32564|   FL|
|     US|          HOMOSASSA|  34487|   FL|
+-------+-------------------+-------+-----+
only showing top 5 rows



In [None]:
# SQL Select query
spark.sql("SELECT country, city, zipcode, state FROM ZIPCODES") \
     .show(5)

+-------+-------------------+-------+-----+
|country|               city|zipcode|state|
+-------+-------------------+-------+-----+
|     US|        PARC PARQUE|    704|   PR|
|     US|PASEO COSTA DEL SUR|    704|   PR|
|     US|       BDA SAN LUIS|    709|   PR|
|     US|               HOLT|  32564|   FL|
|     US|          HOMOSASSA|  34487|   FL|
+-------+-------------------+-------+-----+
only showing top 5 rows



In [None]:
# DataFrame API where()
df.select("country","city","zipcode","state").\
                  where("state == 'AZ'").show(5)

+-------+----+-------+-----+
|country|city|zipcode|state|
+-------+----+-------+-----+
|     US|MESA|  85209|   AZ|
|     US|MESA|  85210|   AZ|
+-------+----+-------+-----+



In [None]:
# SQL where
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES
          WHERE state = 'AZ' """) \
     .show(5)

+-------+----+-------+-----+
|country|city|zipcode|state|
+-------+----+-------+-----+
|     US|MESA|  85209|   AZ|
|     US|MESA|  85210|   AZ|
+-------+----+-------+-----+



In [None]:
# sorting
df.select("country","city","zipcode","state") \
  .where("state in ('PR','AZ','FL')") \
  .orderBy("state") \
  .show(10)

+-------+-------------------+-------+-----+
|country|               city|zipcode|state|
+-------+-------------------+-------+-----+
|     US|               MESA|  85209|   AZ|
|     US|               MESA|  85210|   AZ|
|     US|               HOLT|  32564|   FL|
|     US|          HOMOSASSA|  34487|   FL|
|     US|           HILLIARD|  32046|   FL|
|     US|             HOLDER|  34445|   FL|
|     US|        PARC PARQUE|    704|   PR|
|     US|PASEO COSTA DEL SUR|    704|   PR|
|     US|       BDA SAN LUIS|    709|   PR|
|     US|    URB EUGENE RICE|    704|   PR|
+-------+-------------------+-------+-----+
only showing top 10 rows



In [None]:
# SQL ORDER BY
spark.sql(""" SELECT  country, city, zipcode, state FROM ZIPCODES
          WHERE state in ('PR','AZ','FL') order by state """) \
     .show(10)

+-------+-------------------+-------+-----+
|country|               city|zipcode|state|
+-------+-------------------+-------+-----+
|     US|               MESA|  85209|   AZ|
|     US|               MESA|  85210|   AZ|
|     US|               HOLT|  32564|   FL|
|     US|          HOMOSASSA|  34487|   FL|
|     US|           HILLIARD|  32046|   FL|
|     US|             HOLDER|  34445|   FL|
|     US|        PARC PARQUE|    704|   PR|
|     US|PASEO COSTA DEL SUR|    704|   PR|
|     US|       BDA SAN LUIS|    709|   PR|
|     US|    URB EUGENE RICE|    704|   PR|
+-------+-------------------+-------+-----+
only showing top 10 rows



In [None]:
# grouping
df.groupBy("state").count() \
  .show()

+-----+-----+
|state|count|
+-----+-----+
|   AZ|    2|
|   NC|    3|
|   AL|    3|
|   TX|    3|
|   FL|    4|
|   PR|    5|
+-----+-----+



In [None]:
# SQL GROUP BY clause
spark.sql(""" SELECT state, count(*) as \
          count FROM ZIPCODES
          GROUP BY state""") \
     .show()

+-----+-----+
|state|count|
+-----+-----+
|   AZ|    2|
|   NC|    3|
|   AL|    3|
|   TX|    3|
|   FL|    4|
|   PR|    5|
+-----+-----+



In [None]:
bucket_name="aivnbigdata"
path=f"gs://{bucket_name}/asteroid/zipcode.csv"

df=spark.read.json(path)

df.printSchema()

In [None]:
df.show(7,False)

In [None]:
df.select("title", "price", "year_written").show(5)

In [None]:
# Get books that are written after 1950 & cost greater than $10
df_filtered = df.filter("year_written > 1950 AND price > 10 AND title IS NOT NULL")
df_filtered.select("title", "price", "year_written").show(50, False)

##### ********* Stroke dataset

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .config("spark.jars", "/content/drive/MyDrive/AI2023/gcs-connector-hadoop3-latest.jar") \
        .getOrCreate()

spark._jsc.hadoopConfiguration().set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
# This is required if you are using service account and set true,
spark._jsc.hadoopConfiguration().set('fs.gs.auth.service.account.enable', 'true')
spark._jsc.hadoopConfiguration().set('google.cloud.auth.service.account.json.keyfile', "/content/southern-gasket-395903-48415c815598.json")


In [None]:
input_dir = "gs://aivndemobigdata/asteroid/"
df = spark.read.format("com.databricks.spark.csv").options(header="true", inferschema="true").load(input_dir+"stroke_data_e.csv")
df.columns

In [None]:
pd.DataFrame(df.take(5), columns=df.columns)


In [None]:
featureColumns = ['gender','age',
 'diabetes',
 'hypertension',
 'heart disease',
 'smoking history',
 'BMI']

In [None]:
df = df.filter(df.age >2)
df.count()

In [None]:
import matplotlib.pyplot as plt

# Print the Type of Classes Present
responses = df.groupBy('stroke').count().collect()
categories = [i[0] for i in responses]
counts = [i[1] for i in responses]

ind = np.array(range(len(categories)))
width = 0.35
plt.bar(ind, counts, width=width, color='r')

plt.ylabel('counts')
plt.title('Stroke')
plt.xticks(ind + width/2., categories)

In [None]:
removeAllDF = df.na.drop()
removeAllDF.describe(['BMI']).show()


In [None]:
removeAllDF.count()


In [None]:
imputeDF = df
imputeDF_Pandas = imputeDF.toPandas()


In [None]:
df_2_9 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=2 ) & (imputeDF_Pandas['age'] <= 9)]
values = {'smoking history': 0, 'BMI':17.125}
df_2_9 = df_2_9.fillna(value = values)

df_10_13 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=10 ) & (imputeDF_Pandas['age'] <= 13)]
values = {'smoking history': 0, 'BMI':19.5}
df_10_13 = df_10_13.fillna(value = values)

df_14_17 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=14 ) & (imputeDF_Pandas['age'] <= 17)]
values = {'smoking history': 0, 'BMI':23.05}
df_14_17 = df_14_17.fillna(value = values)

df_18_24 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=18 ) & (imputeDF_Pandas['age'] <= 24)]
values = {'smoking history': 0, 'BMI':27.1}
df_18_24 = df_18_24.fillna(value = values)

df_25_29 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=25 ) & (imputeDF_Pandas['age'] <= 29)]
values = {'smoking history': 0, 'BMI':27.9}
df_25_29 = df_25_29.fillna(value = values)

df_30_34 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=30 ) & (imputeDF_Pandas['age'] <= 34)]
values = {'smoking history': 0.25, 'BMI':29.6}
df_30_34 = df_30_34.fillna(value = values)

df_35_44 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=35 ) & (imputeDF_Pandas['age'] <= 44)]
values = {'smoking history': 0.25, 'BMI':30.15}
df_35_44 = df_35_44.fillna(value = values)

df_45_49 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=45 ) & (imputeDF_Pandas['age'] <= 49)]
values = {'smoking history': 0, 'BMI':29.7}
df_45_49 = df_45_49.fillna(value = values)

df_50_59 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=50 ) & (imputeDF_Pandas['age'] <= 59)]
values = {'smoking history': 0, 'BMI':29.95}
df_50_59 = df_50_59.fillna(value = values)

df_60_74 = imputeDF_Pandas[(imputeDF_Pandas['age'] >=60 ) & (imputeDF_Pandas['age'] <= 74)]
values = {'smoking history': 0, 'BMI':30.1}
df_60_74 = df_60_74.fillna(value = values)

df_75_plus = imputeDF_Pandas[(imputeDF_Pandas['age'] >75 )]
values = {'smoking history': 0, 'BMI':28.1}
df_75_plus = df_75_plus.fillna(value = values)

In [None]:
all_frames = [df_2_9, df_10_13, df_14_17, df_18_24, df_25_29, df_30_34, df_35_44, df_45_49, df_50_59, df_60_74, df_75_plus]
df_combined = pd.concat(all_frames)

In [None]:
df_combined_converted = spark.createDataFrame(df_combined)
imputeDF = df_combined_converted

In [None]:
df.describe(['BMI']).show()
imputeDF.describe(['BMI']).show()

In [None]:
X = imputeDF.toPandas().filter(items=['gender', 'age', 'diabetes','hypertension','heart disease','smoking history','BMI'])
Y = imputeDF.toPandas().filter(items=['stroke'])

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.1, random_state=0)
