In [1]:
# ONLY REQUIRED IF THE NOTEBOOK IS NOT STARTED BY `pyspark`
if not ("spark" in vars() or "spark" in globals()):
    from pyspark import SparkContext, SparkConf

    conf = SparkConf().setAppName("intro-to-df-2").setMaster("local")
    sc = SparkContext(conf=conf)
    # Avoid polluting the console with warning messages
    sc.setLogLevel("ERROR")

    from pyspark.sql import SparkSession

    spark = SparkSession(sc)
else:
    print("SparkSession already configured!")
    
# To avoid breaks when showing tables with lot of columns
from IPython.core.display import HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
acciones_df = spark.read.option("header", True).csv("Acciones.csv")
datos_df = spark.read.option("header", True).csv("datos.csv")

In [3]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def year(x):
    return int(x[:4])

# Integer type output
year_udf_int = udf(lambda z: year(z), IntegerType())

In [4]:
acciones_df = acciones_df.withColumn("year", year_udf_int("date")).select("open", "high", "low", "close", "volume", "year")
acciones_df.show()

+------+------+------+------+--------+----+
|  open|  high|   low| close|  volume|year|
+------+------+------+------+--------+----+
|3.8000|5.0000|3.5080|4.7780|93831500|2010|
|5.1580|6.0838|4.6600|4.7660|85935500|2010|
|5.0000|5.1840|4.0540|4.3920|41094000|2010|
|4.6000|4.6200|3.7420|3.8400|25699000|2010|
|4.0000|4.0000|3.1660|3.2220|34334500|2010|
|3.2800|3.3260|2.9960|3.1600|34608500|2010|
|3.2280|3.5040|3.1140|3.4920|38557000|2010|
|3.5160|3.5800|3.3100|3.4800|20253000|2010|
|3.5900|3.6140|3.4000|3.4100|11012500|2010|
|3.4788|3.7280|3.3800|3.6280|13400500|2010|
|3.5880|4.0300|3.5520|3.9680|20976000|2010|
|3.9880|4.3000|3.8000|3.9780|18699000|2010|
|4.1400|4.2600|4.0100|4.1280|13106500|2010|
|4.2740|4.4500|4.1840|4.3820|12432500|2010|
|4.3700|4.3700|4.0100|4.0600| 9126500|2010|
|4.1320|4.1800|3.9000|4.0440| 6262500|2010|
|4.1000|4.2500|4.0740|4.2000| 4789000|2010|
|4.2380|4.3120|4.2120|4.2580| 3268000|2010|
|4.3000|4.3000|4.0600|4.1900| 4611000|2010|
|4.1820|4.2360|4.0520|4.1100| 30

In [5]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col

acciones_df = acciones_df.withColumn("high",col("high").cast(FloatType())) \
    .withColumn("low",col("low").cast(FloatType())) \
    .withColumn("volume",col("volume").cast(FloatType()))

In [6]:
acciones_df = acciones_df.groupBy("year").mean("high","low","volume")


In [7]:
datos_df = datos_df.withColumnRenamed("Año", "year")
datos_df.show()

+----+---------------------------------------+------------------------------------+------------------------------------------------+--------------------------------------+----------------------------------------------------------------------+----------------------------------------------------+-----------------------------------+--------------+--------------------------------+-----------------------------+-------------+-------------+-------------+-------------+
|year|Anual revenue (in million U.S. dollars)|Net income (in million U.S. dollars)|Estimated EVs sales worldwide (in million units)|R&D Expenses (in million U.S. dollars)|Selling, general and administrative expenses (in million U.S. dollars)|Number vehicles delivered worldwide (in 1,000 units)|Vehicle production (in 1,000 units)|Employee count|Lithium Battery Prices Worldwide|Count Superchargers Worldwide|Model S sales|Model Y sales|Model X sales|Model 3 sales|
+----+---------------------------------------+----------------------

In [8]:
datos_df = datos_df.join(acciones_df, datos_df.year == acciones_df.year, how='left')

In [9]:
datos_df.toPandas().to_csv('Tesla_Spark.csv')