In [43]:
# instalo yfinance
!pip install yfinance




In [44]:
# obtengo los datos de Tesla
import yfinance as yf
tsla = yf.Ticker('TSLA')
tsla.info


{'address1': '1 Tesla Road',
 'city': 'Austin',
 'state': 'TX',
 'zip': '78725',
 'country': 'United States',
 'phone': '512 516 8177',
 'website': 'https://www.tesla.com',
 'industry': 'Auto Manufacturers',
 'industryDisp': 'Auto Manufacturers',
 'sector': 'Consumer Cyclical',
 'longBusinessSummary': 'Tesla, Inc. designs, develops, manufactures, leases, and sells electric vehicles, and energy generation and storage systems in the United States, China, and internationally. It operates in two segments, Automotive, and Energy Generation and Storage. The Automotive segment offers electric vehicles, as well as sells automotive regulatory credits; and non-warranty after-sales vehicle, used vehicles, retail merchandise, and vehicle insurance services. This segment also provides sedans and sport utility vehicles through direct and used vehicle sales, a network of Tesla Superchargers, and in-app upgrades; purchase financing and leasing services; services for electric vehicles through its compa

In [45]:
# obtengo datos
cmp = yf.Ticker('TSLA')
hist = cmp.history(period="1y")
hist['Date']=hist.index
hist=hist.reset_index(drop=True)
hist.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 251 entries, 0 to 250
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype                           
---  ------        --------------  -----                           
 0   Open          251 non-null    float64                         
 1   High          251 non-null    float64                         
 2   Low           251 non-null    float64                         
 3   Close         251 non-null    float64                         
 4   Volume        251 non-null    int64                           
 5   Dividends     251 non-null    float64                         
 6   Stock Splits  251 non-null    float64                         
 7   Date          251 non-null    datetime64[ns, America/New_York]
dtypes: datetime64[ns, America/New_York](1), float64(6), int64(1)
memory usage: 15.8 KB


In [46]:

hist.head(5)


Unnamed: 0,Open,High,Low,Close,Volume,Dividends,Stock Splits,Date
0,224.603333,243.57666,224.333328,237.036667,122793000,0.0,0.0,2022-06-21 00:00:00-04:00
1,234.503326,246.833328,233.82666,236.08667,101107500,0.0,0.0,2022-06-22 00:00:00-04:00
2,237.906662,239.316666,228.636673,235.070007,104202600,0.0,0.0,2022-06-23 00:00:00-04:00
3,237.470001,246.066666,236.08667,245.706665,95770800,0.0,0.0,2022-06-24 00:00:00-04:00
4,249.366669,252.070007,242.566666,244.919998,89178300,0.0,0.0,2022-06-27 00:00:00-04:00


In [47]:
!pip install psycopg2-binary




In [48]:
# creo una sesion de Spark
import os
import psycopg2

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, lit, col

# driver
driver_path = "/home/coder/working_dir/driver_jdbc/postgresql-42.2.27.jre7.jar"

os.environ['PYSPARK_SUBMIT_ARGS'] = f'--driver-class-path {driver_path} --jars {driver_path} pyspark-shell'
os.environ['SPARK_CLASSPATH'] = driver_path

# Create SparkSession 
spark = SparkSession.builder \
        .master("local") \
        .appName("Conexion entre Pyspark y Redshift") \
        .config("spark.jars", driver_path) \
        .config("spark.executor.extraClassPath", driver_path) \
        .getOrCreate()

env = os.environ


In [49]:
spark

In [86]:
# conecto a Redshift con psycopg2



conn = psycopg2.connect(
    host=env['AWS_REDSHIFT_HOST'],
    port=env['AWS_REDSHIFT_PORT'],
    dbname=env['AWS_REDSHIFT_DBNAME'],
    user=env['AWS_REDSHIFT_USER'],
    password=env['AWS_REDSHIFT_PASSWORD']
)

# creo la tabla
table_name='entregable2_tsla'
cursor = conn.cursor()
cursor.execute(f"""drop table if exists {env['AWS_REDSHIFT_USER']}.{table_name}""")
conn.commit()
create_query=f"""CREATE TABLE IF NOT EXISTS {env['AWS_REDSHIFT_USER']}.{table_name}
    (openn numeric(10,2) 
    ,high numeric(10,2)
    ,low numeric(10,2)
    ,close  numeric(10,2) 
    ,date TIMESTAMP WITHOUT TIME ZONE distkey
    ,difference numeric(10,2)) sortkey(date);"""
cursor.execute(create_query)
conn.commit()

# verifico si se creo la tabla
ctrl_query=f"""SELECT distinct tablename FROM PG_TABLE_DEF WHERE schemaname = '{env['AWS_REDSHIFT_SCHEMA']}' and tablename = '{table_name}';"""
cursor.execute(ctrl_query)
tabla = cursor.fetchall()
if table_name == tabla[0][0]:
    print("Tabla creada!")
else:               
    print("Error al crear la tabla.")
cursor.close()
conn.close()


Tabla creada!


In [81]:
# creo un dataframe 
df = spark.createDataFrame(hist, ['Open','High','low','close','v','d','s','date'])

# selecciono las columnas
df = df.select('Open','High','low','close','date')
df = df.withColumn('Difference', col('close') - col('Open'))

df.printSchema()
df.describe().show()
df.show()


root
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- Difference: double (nullable = true)

+-------+-----------------+-----------------+------------------+------------------+-------------------+
|summary|             Open|             High|               low|             close|         Difference|
+-------+-----------------+-----------------+------------------+------------------+-------------------+
|  count|              251|              251|               251|               251|                251|
|   mean|210.4869188407503|215.3294553414759| 205.4621510296704|210.56319406213038|0.07627522138010458|
| stddev|51.34241739858609|52.04538847697913| 50.69535532742428| 51.35121647917801|  6.531008409968428|
|    min|            103.0|           111.75|101.80999755859375| 108.0999984741211|-19.019989013671875|
|    max|311.6666564941406|314.6666564941

In [87]:
# doy formato a las columnas
from pyspark.sql.types import DecimalType

df=df.withColumn("Open", df["Open"].cast(DecimalType(precision=10, scale=2)))
df=df.withColumn("High", df["High"].cast(DecimalType(precision=10, scale=2)))
df=df.withColumn("low", df["low"].cast(DecimalType(precision=10, scale=2)))
df=df.withColumn("close", df["close"].cast(DecimalType(precision=10, scale=2)))
df=df.withColumn("Difference", df["Difference"].cast(DecimalType(precision=10, scale=2)))

# cambio el nombre de la columna open
df=df.withColumnRenamed("Open", "openn")

df.printSchema()
df.show()
df.count()


root
 |-- openn: decimal(10,2) (nullable = true)
 |-- High: decimal(10,2) (nullable = true)
 |-- low: decimal(10,2) (nullable = true)
 |-- close: decimal(10,2) (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- Difference: decimal(10,2) (nullable = true)

+------+------+------+------+-------------------+----------+
| openn|  High|   low| close|               date|Difference|
+------+------+------+------+-------------------+----------+
|224.60|243.58|224.33|237.04|2022-06-21 04:00:00|     12.43|
|234.50|246.83|233.83|236.09|2022-06-22 04:00:00|      1.58|
|237.91|239.32|228.64|235.07|2022-06-23 04:00:00|     -2.84|
|237.47|246.07|236.09|245.71|2022-06-24 04:00:00|      8.24|
|249.37|252.07|242.57|244.92|2022-06-27 04:00:00|     -4.45|
|244.48|249.97|232.34|232.66|2022-06-28 04:00:00|    -11.82|
|230.50|231.17|222.27|228.49|2022-06-29 04:00:00|     -2.01|
|224.51|229.46|218.86|224.47|2022-06-30 04:00:00|     -0.04|
|227.00|230.23|222.12|227.26|2022-07-01 04:00:00|      0.26|
|

251

In [88]:
# elimino las ocurrencias duplicadas
from pyspark.sql.functions import col
dfo=df.dropDuplicates(["date"])
dfo.count()


251

In [89]:
# guardo los datos en la tabla
df.write.format('jdbc') \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"{env['AWS_REDSHIFT_SCHEMA']}.{table_name}") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

 

In [90]:
# verifico los datos guardados 
query = f"select * from {env['AWS_REDSHIFT_SCHEMA']}.{table_name}"
df_db = spark.read \
    .format("jdbc") \
    .option("url", f"jdbc:postgresql://{env['AWS_REDSHIFT_HOST']}:{env['AWS_REDSHIFT_PORT']}/{env['AWS_REDSHIFT_DBNAME']}") \
    .option("dbtable", f"({query}) as t1") \
    .option("user", env['AWS_REDSHIFT_USER']) \
    .option("password", env['AWS_REDSHIFT_PASSWORD']) \
    .option("driver", "org.postgresql.Driver") \
    .load()
    

In [91]:
# verifico el formato de los datos guardados

df_db.printSchema()
df_db.show()
df_db.count()


root
 |-- openn: decimal(10,2) (nullable = true)
 |-- high: decimal(10,2) (nullable = true)
 |-- low: decimal(10,2) (nullable = true)
 |-- close: decimal(10,2) (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- difference: decimal(10,2) (nullable = true)

+------+------+------+------+-------------------+----------+
| openn|  high|   low| close|               date|difference|
+------+------+------+------+-------------------+----------+
|224.51|229.46|218.86|224.47|2022-06-30 04:00:00|     -0.04|
|233.92|245.36|232.21|244.54|2022-07-07 04:00:00|     10.62|
|242.33|254.98|241.16|250.76|2022-07-08 04:00:00|      8.43|
|252.10|253.06|233.63|234.34|2022-07-11 04:00:00|    -17.76|
|225.50|242.06|225.03|237.04|2022-07-13 04:00:00|     11.54|
|240.00|243.62|236.89|240.07|2022-07-15 04:00:00|      0.07|
|244.94|250.52|239.60|240.55|2022-07-18 04:00:00|     -4.39|
|245.00|247.14|236.98|245.53|2022-07-19 04:00:00|      0.53|
|276.22|280.79|270.71|272.24|2022-07-22 04:00:00|     -3.98|
|

251