In [1]:
from os import scandir, path
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import lit,create_map
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType, DateType, LongType, StringType

from itertools import chain

In [2]:
sc = SparkContext.getOrCreate()

In [3]:
sqlContext = pyspark.sql.SQLContext(sc)

In [4]:
priceDF = sqlContext.read.format('csv') \
  .options(header='true', inferSchema='true') \
  .load('gs://financials-data-bucket/data/prueba/dataSBG/price.csv')

In [5]:
priceDF = priceDF.select( \
F.col('code').alias('CODE'), \
F.col('Price').alias('PRICE'), \
F.col('Date').alias('DATE'))

In [6]:
priceDF = priceDF.withColumn('YEAR', F.substring('DATE', 1, 4).cast(IntegerType()))

In [7]:
#Rentabilidad diaria
windowSpec = Window.partitionBy("CODE").orderBy(F.col('DATE').asc()).rowsBetween(-1,0)
priceDF = priceDF.withColumn('AUX', F.sum("price").over(windowSpec))
priceDF = priceDF.withColumn("RETURNS", (F.col("PRICE") - (F.col("AUX")-F.col("PRICE"))) / F.col("PRICE")).drop("AUX")
priceDF.show()

+----+-----+-------------------+----+--------------------+
|CODE|PRICE|               DATE|YEAR|             RETURNS|
+----+-----+-------------------+----+--------------------+
| MMM| 9.22|1972-06-01 00:00:00|1972|                 1.0|
| MMM| 9.26|1972-06-02 00:00:00|1972|0.004319654427645697|
| MMM|  9.2|1972-06-05 00:00:00|1972|-0.00652173913043503|
| MMM| 9.17|1972-06-06 00:00:00|1972|-0.00327153762268...|
| MMM| 9.08|1972-06-07 00:00:00|1972|-0.00991189427312...|
| MMM| 9.02|1972-06-08 00:00:00|1972|-0.00665188470066...|
| MMM| 8.94|1972-06-09 00:00:00|1972|-0.00894854586129...|
| MMM|  8.9|1972-06-12 00:00:00|1972|-0.00449438202247...|
| MMM| 8.96|1972-06-13 00:00:00|1972|0.006696428571428824|
| MMM|  9.1|1972-06-14 00:00:00|1972|0.015384615384615058|
| MMM| 9.26|1972-06-15 00:00:00|1972| 0.01727861771058317|
| MMM| 9.25|1972-06-16 00:00:00|1972|-0.00108108108108...|
| MMM| 9.14|1972-06-19 00:00:00|1972|-0.01203501094091...|
| MMM|  9.1|1972-06-20 00:00:00|1972|-0.00439560439560..

In [8]:
wCY = Window.partitionBy("CODE", "YEAR").orderBy("DATE")
#Nos quedamos con el precio de cada compañia en cada año el dia 1
dico = priceDF.withColumn("RminD",  F.row_number().over(wCY)).filter("RminD == 1").drop("DATE", "RminD") 

In [9]:
#Lista del tipo codyear: precio inicial al principio de año
df_dict = [{r['CODE'] + str(r['YEAR']): r['PRICE']} for r in dico.orderBy("CODE", "YEAR").collect()] #Es una lista
df_dict = dict((key,d[key]) for d in df_dict for key in d) #Transformo a diccionario

In [10]:
mapping_expr = create_map([F.lit(x) for x in chain(*df_dict.items())]) #Transformamos a un mapa
r_acuDF = priceDF.withColumn("CUMULATIVE_RETURNS", (F.col("PRICE") - mapping_expr.getItem(F.concat(F.col("CODE"), F.col("YEAR").cast(StringType())))) / mapping_expr.getItem(F.concat(F.col("CODE"), F.col("YEAR").cast(StringType()))))
r_acuDF.show(1000)

+----+-----+-------------------+----+--------------------+--------------------+
|CODE|PRICE|               DATE|YEAR|             RETURNS|  CUMULATIVE_RETURNS|
+----+-----+-------------------+----+--------------------+--------------------+
| MMM| 9.22|1972-06-01 00:00:00|1972|                 1.0|                 0.0|
| MMM| 9.26|1972-06-02 00:00:00|1972|0.004319654427645697|0.004338394793926155|
| MMM|  9.2|1972-06-05 00:00:00|1972|-0.00652173913043503|-0.00216919739696327|
| MMM| 9.17|1972-06-06 00:00:00|1972|-0.00327153762268...|-0.00542299349240...|
| MMM| 9.08|1972-06-07 00:00:00|1972|-0.00991189427312...|-0.01518438177874...|
| MMM| 9.02|1972-06-08 00:00:00|1972|-0.00665188470066...|-0.02169197396963135|
| MMM| 8.94|1972-06-09 00:00:00|1972|-0.00894854586129...|-0.03036876355748385|
| MMM|  8.9|1972-06-12 00:00:00|1972|-0.00449438202247...|-0.03470715835141001|
| MMM| 8.96|1972-06-13 00:00:00|1972|0.006696428571428824|-0.02819956616052...|
| MMM|  9.1|1972-06-14 00:00:00|1972|0.0

In [11]:
%%time

r_acuDF \
.write.format("com.databricks.spark.csv") \
.option("header", "true") \
.save("gs://financials-data-bucket/data/prueba/rentabilidad_prices.csv")

CPU times: user 68 ms, sys: 24 ms, total: 92 ms
Wall time: 6min 29s
