# INSTALL LIBRARY

In [None]:
! pip install boto3

! pip install psycopg2

## INSTALL OTHER DEPENDENCIES THAT WILL HELP SPARK TO WORK

In [None]:
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
 
!wget -q "https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz"
#UNZIP
! tar -xf /content/spark-3.1.1-bin-hadoop2.7.tgz

! pip install -q findspark

# IMPORT LIBRARIES

In [None]:
import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import boto3
from pyspark import SparkFiles
import psycopg2 as psy

# CONFIGURE PYSPARK & AWS BOTO3

In [None]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [None]:
 # FOR SPARK
 spark = SparkSession.builder\
        .master("local")\
        .appName("ETL_WITH_AWS")\
        .config('spark.driver.extraClassPath', '/content/spark-3.1.1-bin-hadoop2.7')\
        .getOrCreate()

In [None]:
#FOR AWS S3
client=boto3.client('s3', aws_access_key_id='<KEY_ID>',
    aws_secret_access_key='<PRIVATE-KEYS>')


In [None]:
#FOR AWS RDS
rds=boto3.client('rds', 'us-east-1', aws_access_key_id='<ACCESS-KEY>',
    aws_secret_access_key='<ACCESS-KEY>'
)

In [None]:
# FOR PSYCOPG2
conn= psy.connect(host='db1.chrysjn10xko.us-east-1.rds.amazonaws.com', 
                  port=5432,
                  user='<USER>',
                  password='<PASSWORD>',
                  database='db1'
                  )


cur= conn.cursor()

# TO EXTRACT DATA

## TO EXTRACT FROM LOCAL DIRECTORY

In [None]:
data=spark.read.csv('/content/drive/MyDrive/Colab Notebooks/BreastCancer.csv', inferSchema= True, header= True)
data.show()

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+--------+---------+-----------+------

## TO EXTRACT FROM AWS S3 PUBLIC

In [None]:
url='https://gim-ultra.s3.amazonaws.com/2021_public.csv'
spark.sparkContext.addFile(url)
data=spark.read.csv(SparkFiles.get('2021_public.csv'), header=True, inferSchema=True)
data.show()

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+--------+---------+-----------+------

## TO EXTRACT FROM AWS S3 PRIVATE

In [None]:
#spark._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
#spark._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<ACCESS-ID>")
#spark._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "<ACCESS-KEY")
#spark.read.format('csv').load('s3://gim-ultra/2021_private.csv' )


# OR USE AWS BOTO3

client.download_file( 'gim-ultra', '2021_private.csv', 'new.csv')

In [None]:
# read new file with pyspark
data_s=spark.read.csv('new.csv', header= True, inferSchema= True)
data_s.show()

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|_c32|
+--------+---------+-----------+------

## TO EXTRACT FILE FROM RDS

In [None]:
query= """ Select * from sometablename3 """
cur.execute(query)
result = cur.fetchone()
for i in result:
  print (result)

(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')


# TO TRANSFORM DATA

## I USE SPARK SQL FOR EASY MANIPULATION

In [None]:
# CONVERT DF TO SQL TABLE

table=data.registerTempTable('table1')

In [None]:
# write a query that selects the only'mean' related column

query = 'SELECT id, diagnosis, radius_mean, perimeter_mean, smoothness_mean, compactness_mean, concavity_mean, symmetry_mean  FROM table1'
sql1=spark.sql(query)
sql1.show()

+--------+---------+-----------+--------------+---------------+----------------+--------------+-------------+
|      id|diagnosis|radius_mean|perimeter_mean|smoothness_mean|compactness_mean|concavity_mean|symmetry_mean|
+--------+---------+-----------+--------------+---------------+----------------+--------------+-------------+
|  842302|        M|      17.99|         122.8|         0.1184|          0.2776|        0.3001|       0.2419|
|  842517|        M|      20.57|         132.9|        0.08474|         0.07864|        0.0869|       0.1812|
|84300903|        M|      19.69|         130.0|         0.1096|          0.1599|        0.1974|       0.2069|
|84348301|        M|      11.42|         77.58|         0.1425|          0.2839|        0.2414|       0.2597|
|84358402|        M|      20.29|         135.1|         0.1003|          0.1328|         0.198|       0.1809|
|  843786|        M|      12.45|         82.57|         0.1278|            0.17|        0.1578|       0.2087|
|  844359|

# TO LOAD DATA

## TO LOAD TO LOCAL MACHINE

In [None]:
sql1.write.csv('data.csv')
# OR
sql1.coalesce(1).write.csv('data1.csv')

## TO WRITE TO AWS S3

In [None]:
# PRIVATELY
s3_priv=client.upload_file('/content/data1.csv/part-00000-cb935cf3-c97f-42db-88c0-b21d8ac507f1-c000.csv', Bucket='gim-ultra',Key='2021_private1.csv')

# FOR PUBLIC
s3.upload_file('/content/data1.csv/part-00000-cb935cf3-c97f-42db-88c0-b21d8ac507f1-c000.csv', Bucket='gim-ultra',Key='2021_public.csv', ExtraArgs={'ACL':'public-read'})

## TO LOAD DATA TO RDS

### CREATE DB INSTANCE

In [1]:

# CREATE A DB_INSTANCE ENGINE

'''response=rds.create_db_instance(Engine='Postgres',
 AllocatedStorage= 10, DBInstanceIdentifier='mydb', DBName='u<NAME>',
  DBInstanceClass='db.m4.Large', MasterUsername='<USERNAME',
   MasterUserPassword='<PASSWORD>') '''

"response=rds.create_db_instance(Engine='Postgres',\n AllocatedStorage= 10, DBInstanceIdentifier='mydb', DBName='u<NAME>',\n  DBInstanceClass='db.m4.Large', MasterUsername='<USERNAME',\n   MasterUserPassword='<PASSWORD>') "

In [None]:

cur.execute("""CREATE TABLE sometablename3(
some_col integer PRIMARY KEY,
some_col1 text,
some_col2 text,
some_col3 text,
some_col4 text,
some_col5 text,
some_col6 text,
some_col7 text
) """)
conn.commit()

In [None]:
# COPY CSV DATA INTO DATABASE
with open('/content/data1.csv/part-00000-cb935cf3-c97f-42db-88c0-b21d8ac507f1-c000.csv', 'r') as f:
  next(f) # Skip the header row.
cur.copy_from(f, 'sometablename3', sep=',')
conn.commit()

In [None]:
query= """ Select * from sometablename3 """
cur.execute(query)
result = cur.fetchone()
for i in result:
  print (result)
#conn.close()

(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
(842517, 'M', '20.57', '132.9', '0.08474', '0.07864', '0.0869', '0.1812')
