In [1]:
spark

Waiting for a Spark session to start...

In [12]:
%AddJar https://producteng.builds.test.netflix.net/job/rank-v2-release/342/artifact/rank-spark/build/libs/rank-spark_2.11-5.0.64-all.jar

Starting download from https://producteng.builds.test.netflix.net/job/rank-v2-release/342/artifact/rank-spark/build/libs/rank-spark_2.11-5.0.64-all.jar
Finished download of rank-spark_2.11-5.0.64-all.jar


In [13]:
val tableName = "rmenezes.quotes_vegas"

tableName = rmenezes.quotes_vegas


rmenezes.quotes_vegas

In [14]:
import com.netflix.boson.viz.display._
import vegas._
import vegas.sparkExt._

In [15]:
import com.netflix.boson.util.DateInt

def isTradingDay(date: Int) = {
    spark.table(tableName).filter($"date" === date).count > 100
}

def getNextTradingDay(date: Int) = {
    var currDate = DateInt(date).plusDays(1).intValue
    while (!isTradingDay(currDate)) {
        currDate = DateInt(currDate).plusDays(1).intValue
    }
    currDate
}


def getPreviousTradingDay(date: Int) = {
    var currDate = DateInt(date).minusDays(1).intValue
    while (!isTradingDay(currDate)) {
        currDate = DateInt(currDate).minusDays(1).intValue
    }
    currDate
}

def getTradingDay(date: Int) = {
    if (isTradingDay(date)) {
        date
    } else {
        getNextTradingDay(date)
    }
}

isTradingDay: (date: Int)Boolean
getNextTradingDay: (date: Int)Int
getPreviousTradingDay: (date: Int)Int
getTradingDay: (date: Int)Int


In [16]:
getTradingDay(20200104)

20200106

In [17]:
def percentageGrowth: (Double, Double) => Double = { (start: Double, end: Double) =>
    val growth = (end - start) / start
    growth * 100.0
} 

def percentageGrowthUDF = udf { percentageGrowth }

percentageGrowth = > Double


percentageGrowthUDF: org.apache.spark.sql.expressions.UserDefinedFunction


<function2>

In [18]:
import org.apache.spark.sql.DataFrame

def readDF(date: Int) = {
    val df = spark.table(tableName).filter($"date" === date)
    assert(df.count > 1000, s"$date wasn't a trading day")
    df
}

def readSafeDF(date: Int) = {
    val tradingDay = getTradingDay(date)
    Console.println(s"trading day is ${tradingDay}")
    spark.table(tableName).filter($"date" === getTradingDay(tradingDay))
}


def getComparisionDF(startDF: DataFrame, endDF: DataFrame) = {
  startDF.join(endDF, startDF("ticker") === endDF("ticker"))
    .withColumn("diff_percentage", percentageGrowthUDF(startDF("open"), endDF("open")))
    .select(startDF("ticker"),
            endDF("open") as "end_of_period",
            startDF("open") as "start_of_period",
            $"diff_percentage"
    )
}

def getComparisionDFWithDates(startDate: Int, endDate: Int) = {
  val startDF = readDF(startDate)
  val endDF = readDF(endDate)
  getComparisionDF(startDF, endDF)
}

def getSafeComparisionDFWithDates(startDate: Int, endDate: Int) = {
  val startDF = readSafeDF(startDate)
  val endDF = readSafeDF(endDate)
  getComparisionDF(startDF, endDF)
}

readDF: (date: Int)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
readSafeDF: (date: Int)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
getComparisionDF: (startDF: org.apache.spark.sql.DataFrame, endDF: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame
getComparisionDFWithDates: (startDate: Int, endDate: Int)org.apache.spark.sql.DataFrame
getSafeComparisionDFWithDates: (startDate: Int, endDate: Int)org.apache.spark.sql.DataFrame


In [21]:
val namesDF = spark.table("rmenezes.zz_20201014_quotes_names")

namesDF = [ticker: string, name: string]


[ticker: string, name: string]

In [23]:
def addNames(df: DataFrame) = {
    df.join(namesDF, df("ticker") === namesDF("ticker"), "left_outer")
        .drop(namesDF("ticker"))
}

addNames: (df: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


### Follow up

1. Plot how the percentage growth over the last 5 years, looks like for a bunch of stocks, FANG
2. Visualize them over Vegas.
3. Also, just check how the average prices of stocks have gone higher.  So, if you would have bought it every day, you still would have been in a better position now.
4. What happens if you would have bought it on only days when the stock has dropped.
5. Stocks that have been growing positively year over year but have dropped down in 2020. Rather than comparing single days, compare 7 day moving averages.


In [19]:
val growth2019DF = addNames(getSafeComparisionDFWithDates(20190101, 20191231))

trading day is 20190101
trading day is 20191231


growth2019DF = [ticker: string, end_of_period: double ... 3 more fields]


[ticker: string, end_of_period: double ... 3 more fields]

In [22]:
val growth2018DF = addNames(getSafeComparisionDFWithDates(20180101, 20181231))

trading day is 20180101
trading day is 20181231


growth2018DF = [ticker: string, end_of_period: double ... 3 more fields]


[ticker: string, end_of_period: double ... 3 more fields]

In [20]:
growth2019DF.printSchema

root
 |-- ticker: string (nullable = true)
 |-- end_of_period: double (nullable = true)
 |-- start_of_period: double (nullable = true)
 |-- diff_percentage: double (nullable = true)
 |-- name: string (nullable = true)



In [26]:
def func: (String, String) => String = { (year, column) =>
    if (List("end_of_period", "start_of_period", "diff_percentage").contains(column))
        s"${year}_$column"
    else
        column
    
}

func = > String


<function2>

In [28]:
func("2018", "end_of_period")

2018_end_of_period

In [None]:
lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

In [31]:
growth2019DF.columns.foreach { col =>
    growth2019DF.withColumnRenamed(col, func("2019", col))
}

Name: Compile Error
Message: <console>:78: error: value printSchema is not a member of Unit
possible cause: maybe a semicolon is missing before `value printSchema'?
       }.printSchema
         ^

StackTrace: 

In [35]:
val growth2019DFChanged = growth2019DF.columns.foldLeft(growth2019DF)((acc, ca) => acc.withColumnRenamed(ca, func("2019", ca)))

growth2019DFChanged = [ticker: string, 2019_end_of_period: double ... 3 more fields]


[ticker: string, 2019_end_of_period: double ... 3 more fields]

In [36]:
val growth2018DFChanged = growth2018DF.columns.foldLeft(growth2018DF)((acc, ca) => acc.withColumnRenamed(ca, func("2018", ca)))

growth2018DFChanged = [ticker: string, 2018_end_of_period: double ... 3 more fields]


[ticker: string, 2018_end_of_period: double ... 3 more fields]

In [38]:
growth2019DFChanged.join(growth2018DFChanged, Seq("ticker"))
    .filter($"2019_diff_percentage" > 10.0)
    .filter($"2018_diff_percentage" > 10.0)
    .orderBy($"2019_diff_percentage".desc).display
    



ticker,2019_end_of_period,2019_start_of_period,2019_diff_percentage,name,2018_end_of_period,2018_start_of_period,2018_diff_percentage,name.1
ENPH,25.16,4.73,431.92389006342495,Enphase Energy Inc,4.84,2.41,100.82987551867215,Enphase Energy Inc
ARWR,63.94,12.42,414.8148148148148,Arrowhead Pharma,12.16,3.68,230.43478260869568,Arrowhead Pharma
MYSZ,3.3963,0.77,341.0779220779221,My Size Inc,0.775,0.65,19.23076923076923,My Size Inc
XBIT,19.59,5.08,285.6299212598425,Xbiotech Inc,5.0,3.94,26.903553299492387,Xbiotech Inc
RETA,203.21,56.1,262.2281639928699,Reata Pharma,54.53,28.32,92.5494350282486,Reata Pharma
CCXI,37.92,10.91,247.5710357470211,Chemocentryx Inc,10.99,5.95,84.70588235294117,Chemocentryx Inc
IOVA,27.42,8.85,209.83050847457628,Iovance Biotherapeutics Inc,9.26,8.0,15.749999999999996,Iovance Biotherapeutics Inc
MRTX,127.98,42.42,201.6973125884017,Mirati Therapeutics,42.12,18.25,130.79452054794518,Mirati Therapeutics
AUPH,19.96,6.82,192.66862170087975,Aurinia Pharm Ord,6.85,4.53,51.21412803532007,Aurinia Pharm Ord
KRYS,57.39,20.78,176.17901828681423,Krystal Biotech Inc,20.84,10.52,98.09885931558937,Krystal Biotech Inc


In [32]:
growth2019DF.printSchema

root
 |-- ticker: string (nullable = true)
 |-- end_of_period: double (nullable = true)
 |-- start_of_period: double (nullable = true)
 |-- diff_percentage: double (nullable = true)
 |-- name: string (nullable = true)



In [None]:
Map("")

In [21]:
growth2019DF
    .filter($"start_of_period" > 10)
    .orderBy($"diff_percentage".desc).display

ticker,end_of_period,start_of_period,diff_percentage,name
CDLX,61.45,10.83,467.4053554939982,Cardlytics Inc
ARWR,63.94,12.42,414.8148148148148,Arrowhead Pharma
EIDX,60.36,13.76,338.66279069767444,Eidos Therapeutics Inc
ROKU,128.75,30.64,320.20234986945167,Roku Inc
RETA,203.21,56.1,262.2281639928699,Reata Pharma
CCXI,37.92,10.91,247.5710357470211,Chemocentryx Inc
ARVN,42.28,12.85,229.0272373540856,Arvinas Inc
ZEAL,35.3599,11.61,204.5641688199828,Zealand Pharma A/S ADR
MRTX,127.98,42.42,201.6973125884017,Mirati Therapeutics
DCPH,61.9,20.99,194.9023344449738,Deciphera Pharmaceuticals Inc


In [14]:
val growth2016DF = getSafeComparisionDFWithDates(20160101, 20161231)

trading day is 20160101
trading day is 20170102


growth2016DF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [15]:
val growth2015DF = getSafeComparisionDFWithDates(20150101, 20151231)

trading day is 20150101
trading day is 20151231


growth2015DF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [28]:
growth2019DF
    .filter($"ticker".isin(FAANG:_*))
    .display()

ticker,end_of_period,start_of_period,diff_percentage
AAPL,290.2,157.74,83.97362748827182
AMZN,1842.0,1501.97,22.63893419975099
FB,203.995,131.09,55.61446334579296
GOOG,1330.11,1035.61,28.437346105194045
MSFT,156.77,101.57,54.34675593186967
NFLX,322.0,267.66,20.301875513711416


In [7]:
val growth2018DF = getSafeComparisionDFWithDates(20180101, 20181231)

trading day is 20180101
trading day is 20181231


growth2018DF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [8]:
growth2018DF
    .filter($"ticker".isin(FAANG:_*))
    .display()

ticker,end_of_period,start_of_period,diff_percentage
AAPL,158.53,169.23,-6.322756012527324
AMZN,1510.8,1169.47,29.186725610746745
FB,134.45,176.46,-23.80709509237222
GOOG,1050.96,1046.4,0.4357798165137562
MSFT,101.29,85.54,18.41243862520458
NFLX,260.16,191.96,35.52823504896854


In [16]:
val fullDF = growth2018DF.withColumn("year", lit(2018))
    .union(growth2019DF.withColumn("year", lit(2019)))
    .union(growth2015DF.withColumn("year", lit(2015)))
    .union(growth2016DF.withColumn("year", lit(2016)))
    .union(growth2017DF.withColumn("year", lit(2017)))

fullDF = [ticker: string, end_of_period: double ... 3 more fields]


[ticker: string, end_of_period: double ... 3 more fields]

In [40]:
def startOfYear(year: Int) = {
    s"${year}0101".toInt
}

startOfYear: (year: Int)Int


In [41]:
def endOfYear(year: Int) = {
    s"${year}1231".toInt
}

endOfYear: (year: Int)Int


In [42]:
val dfs = List(2015, 2016, 2017, 2018, 2019).map { year =>
    getSafeComparisionDFWithDates(startOfYear(year), endOfYear(year))
        .withColumn("year", lit(year))
}

trading day is 20150101
trading day is 20151231
trading day is 20160101
trading day is 20170102
trading day is 20170102
trading day is 20180101
trading day is 20180101
trading day is 20181231
trading day is 20190101
trading day is 20191231


dfs = List([ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields])


List([ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields], [ticker: string, end_of_period: double ... 3 more fields])

In [43]:
val overYearDF = dfs.reduce(_ union _)

overYearDF = [ticker: string, end_of_period: double ... 3 more fields]


[ticker: string, end_of_period: double ... 3 more fields]

In [62]:
val overYearDFWithNames = addNames(overYearDF)

overYearDFWithNames = [ticker: string, end_of_period: double ... 4 more fields]


[ticker: string, end_of_period: double ... 4 more fields]

In [66]:
overYearDFWithNames.filter($"year" === 2019)
    .filter($"start_of_period" > 20)
    .orderBy($"diff_percentage".desc).display



ticker,end_of_period,start_of_period,diff_percentage,year,name
ROKU,128.75,30.64,320.20234986945167,2019,Roku Inc
RETA,203.21,56.1,262.2281639928699,2019,Reata Pharma
MRTX,127.98,42.42,201.6973125884017,2019,Mirati Therapeutics
DCPH,61.9,20.99,194.9023344449738,2019,Deciphera Pharmaceuticals Inc
KRYS,57.39,20.78,176.17901828681423,2019,Krystal Biotech Inc
SEDG,93.9,35.1,167.52136752136752,2019,Solaredge Tech
NVCR,84.93,33.48,153.67383512544808,2019,Novocure Ltd Ord Sh
EHTH,95.67,38.42,149.01093180635087,2019,Ehealth Inc
CRUS,82.6,33.18,148.94514767932486,2019,Cirrus Logic Inc
QURE,71.46,28.82,147.95281054823036,2019,Uniqure N.V.


In [64]:
overYearDFWithNames.filter($"year" === 2018)
    .orderBy($"diff_percentage".desc).display

ticker,end_of_period,start_of_period,diff_percentage,year,name
TNDM,37.8,2.36,1501.6949152542372,2018,Tandem Diabetes Care
TVIX,72.5,5.54,1208.6642599277975,2018,VS 2X VIX Short Term
AKER,1.35,0.13,938.4615384615385,2018,Akers Biosciences
UGLD,94.88,11.14,751.7055655296228,2018,VS 3X Gold
HEAR,14.53,1.8,707.2222222222221,2018,Turtle Beach Corp
USLV,72.45,11.61,524.031007751938,2018,VS 3X Silver
PRQR,15.59,3.225,383.4108527131783,2018,Proqr Therapeutics
AMRN,13.47,4.01,235.9102244389028,2018,Amarin Corp Ads
ARWR,12.16,3.68,230.43478260869568,2018,Arrowhead Pharma
CDNA,23.72,7.34,223.1607629427793,2018,Caredx Inc


In [59]:
val ticks = overYearDF.filter($"diff_percentage" > 20)
    .groupBy("ticker")
    .count
    .filter($"count" > 3)
    .select("ticker")
    .collect.map(_.getAs[String]("ticker"))



ticks = Array(ABMD, MTCH, VICR, FFWM, CARO, MKTX, ERI, MPWR, CWST, SIMO, BLFS, ZBRA, SHEN, EXPO, CEVA, LMAT, LULU, NVDA, SBBX, WIX, FIVE, BOOM, MRCY, UPLD, ADSK, AEIS, AZPN, PDEX, CDW, AVGO, PCTY, MMAC, FSBW, AMED, FIVN, MASI, VRSN, CYRXW, NFLX, ADBE, NSSC, WING, SKYW, LUNA, FSFG, TBK, APPF, MRTX, PATK, LHCG, PEGA, AMRN, AMZN, POOL, KTOS)


[ABMD, MTCH, VICR, FFWM, CARO, MKTX, ERI, MPWR, CWST, SIMO, BLFS, ZBRA, SHEN, EXPO, CEVA, LMAT, LULU, NVDA, SBBX, WIX, FIVE, BOOM, MRCY, UPLD, ADSK, AEIS, AZPN, PDEX, CDW, AVGO, PCTY, MMAC, FSBW, AMED, FIVN, MASI, VRSN, CYRXW, NFLX, ADBE, NSSC, WING, SKYW, LUNA, FSFG, TBK, APPF, MRTX, PATK, LHCG, PEGA, AMRN, AMZN, POOL, KTOS]

In [60]:
Vegas("Quotes Data").
  withDataFrame(overYearDF.filter($"ticker".isin(ticks.take(10): _*))).
  mark(Line).
  encodeX("year", Ordinal).
  encodeY("diff_percentage", Quantitative, scale=Scale(domainValues=List(-30.0, 200.0))).
  encodeColor("ticker", Nominal).
  show



In [17]:
fullDF
    .filter($"ticker".isin(FAANG:_*))
    .display()



ticker,end_of_period,start_of_period,diff_percentage,year
AAPL,158.53,169.23,-6.322756012527324,2018
AMZN,1510.8,1169.47,29.186725610746745,2018
FB,134.45,176.46,-23.80709509237222,2018
GOOG,1050.96,1046.4,0.4357798165137562,2018
MSFT,101.29,85.54,18.41243862520458,2018
NFLX,260.16,191.96,35.52823504896854,2018
AAPL,290.2,157.74,83.97362748827182,2019
AMZN,1842.0,1501.97,22.63893419975099,2019
FB,203.995,131.09,55.61446334579296,2019
GOOG,1330.11,1035.61,28.437346105194045,2019


In [22]:
Vegas("Quotes Data").
  withDataFrame(fullDF.filter($"ticker".isin(FAANG:_*))).
  mark(Line).
  encodeX("year", Ordinal).
  encodeY("diff_percentage", Quantitative, scale=Scale(domainValues=List(-30.0, 200.0))).
  encodeColor("ticker", Nominal).
  show

In [43]:
val growth2017DF = getSafeComparisionDFWithDates(20170101, 20171231)

trading day is 20170102
trading day is 20180101


growth2017DF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [34]:
growth2017DF
    .filter($"ticker".isin(FAANG:_*))
    .display()

ticker,end_of_period,start_of_period,diff_percentage
AAPL,169.23,115.82,46.114660680366086
AMZN,1169.47,749.87,55.95636577006682
FB,176.46,115.05,53.376792698826605
GOOG,1046.4,771.82,35.575652354175844
MSFT,85.54,62.14,37.65690376569039
NFLX,191.96,123.8,55.05654281098546


In [45]:
growth2017DF
    .filter($"ticker".isin("TSLA"))
    .display()

ticker,end_of_period,start_of_period,diff_percentage
TSLA,311.35,213.69,45.701717441153086


In [24]:
def getYearDF(year: String) = {
    val startOfYear = s"${year}0101"
    val endOfYear = s"${year}1231"
    spark.table(tableName)
        .filter($"date" >= startOfYear)
        .filter($"date" <= endOfYear)
        .groupBy("ticker")
        .agg(
            org.apache.spark.sql.functions.min($"open") as s"min_${year}",
            org.apache.spark.sql.functions.max($"open") as s"max_${year}",
            org.apache.spark.sql.functions.avg($"open") as s"avg_${year}"
        )
        .cache()
}

getYearDF: (year: String)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]


In [20]:
val df2019 = getYearDF("2019")

df2019 = [ticker: string, min_2019: double ... 2 more fields]


[ticker: string, min_2019: double ... 2 more fields]

In [21]:
df2019
    .filter($"ticker".isin(FAANG:_*))
    .display()



ticker,min_2019,max_2019,avg_2019
MSFT,99.55,159.45,130.2564367816092
FB,128.99,208.6697,181.40645478927203
NFLX,255.71,382.77,328.8103065134099
AAPL,143.98,291.12,207.7258620689655
GOOG,1016.57,1363.35,1186.6421455938698
AMZN,1465.2,2025.62,1787.6514176245214


In [39]:
val compareDF = getSafeComparisionDFWithDates(20200802, 20201214)

trading day is 20200803
trading day is 20201214


compareDF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [43]:
addNames(compareDF)
    .filter($"end_of_period" > 10.0)
    .orderBy($"diff_percentage".desc)
    .show(300, false)

+------+-------------+---------------+------------------+------------------------------------------------+
|ticker|end_of_period|start_of_period|diff_percentage   |name                                            |
+------+-------------+---------------+------------------+------------------------------------------------+
|OAS   |38.91        |0.6307         |6069.335658791818 |Oasis Petroleum Inc                             |
|HJLI  |13.22        |0.3341         |3856.8991319964084|Hancock Jaffe Laboratories Inc                  |
|NCSM  |19.58        |0.5626         |3380.270174191255 |Ncs Multistage Holdings Inc                     |
|MGEN  |18.7325      |1.11           |1587.6126126126128|Miragen Therapeutics Inc                        |
|MCRB  |24.44        |3.84           |536.4583333333334 |Seres Theraptc                                  |
|PACB  |22.21        |3.76           |490.6914893617023 |Pacific Biosciences                             |
|PRPH  |10.28        |1.81           

In [44]:
spark.stop()

In [12]:
val withNamesDF = compareDF.join(namesDF, compareDF("ticker") === namesDF("ticker"))
    .drop(namesDF("ticker"))

withNamesDF = [ticker: string, end_of_period: double ... 3 more fields]


[ticker: string, end_of_period: double ... 3 more fields]

In [14]:
withNamesDF
    .filter($"end_of_period" > 10.0)
    .withColumn("diff_percentage", $"diff_percentage" * 100.0)
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage,name
RRGB,12.12,33.08,-63.3615477629988,Red Robin Gourmet Burgers Inc
MTSC,17.86,48.33,-63.04572729153735,M T S Systems Cp
CUTR,13.57,36.18,-62.4930901050304,Cutera Inc
ATRO,10.57,28.15,-62.45115452930728,Astronics Cp
ZIV,28.5,73.03,-60.97494180473778,VS -1X VIX Mid Term
NBLX,10.45,26.69,-60.84675908579993,Noble Midstream Partners LP
GIII,13.97,34.37,-59.35408786732616,G-III Apparel Gp
SOHON,10.75,26.114,-58.83434173240407,Sotherly Hotels Inc Perp Pfd Ser D
WLFC,24.81,59.46,-58.2744702320888,Willis Lease Fin C
SOHOO,10.76,25.7501,-58.21375450969123,Sotherly Hotels LP


In [16]:
compareDF
    .filter($"end_of_period" > 10.0)
    .withColumn("diff_percentage", diff_percentage * 100.0)
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage
WLFC,21.16,59.46,-0.6441305079044736
PVAC,11.66,30.69,-0.6200716845878136
SOHOB,10.2,26.4,-0.6136363636363636
UAL,35.77,89.57,-0.6006475382382493
CUTR,14.49,36.18,-0.599502487562189
MTSC,19.44,48.33,-0.5977653631284916
SOHOO,10.38,25.7501,-0.5968947693407015
PLAY,16.5,40.48,-0.592391304347826
ATRO,11.81,28.15,-0.5804618117229129
GIII,14.5,34.37,-0.5781204538842013


In [19]:
val week52High = spark.table(tableName)
    .filter($"date" < 20200101)
    .filter($"date" > 20190101)
    .groupBy("ticker")
    .agg(max($"open") as "open")
    .cache()



week52High = [ticker: string, open: double]


[ticker: string, open: double]

In [20]:
week52High.filter($"ticker" === "MSFT").show()

|ticker|  open|
+------+------+
|  MSFT|159.45|
+------+------+



#### Also, just check how the average prices of stocks have gone higher.  So, if you would have bought it every day, you still would have been in a better position now.

Now, what happens if you would have bought it on days when there was a fall? What would the gains look like.

In [1]:
val compareWithMaxDF = getComparisionDF(week52High, readDF(20200604))

Name: Compile Error
Message: <console>:58: error: not found: value week52High
       val compareWithMaxDF = getComparisionDF(week52High, readDF(20200604))
                                               ^

StackTrace: 

In [23]:
compareWithMaxDF
    .filter($"end_of_period" > 10.0)
    .filter($"ticker" === "MSFT")
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage
MSFT,184.3,159.45,0.1558482282847289


In [22]:
compareWithMaxDF
    .filter($"end_of_period" > 10.0)
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage
TLRY,10.13,99.0,-0.8976767676767677
SAGE,35.92,190.7,-0.8116413214472993
PVAC,11.66,57.79,-0.7982349887523792
BNFT,13.63,60.0,-0.7728333333333333
ANAB,18.94,81.23,-0.7668349132094054
INGN,38.08,152.87,-0.750899457055014
PBYI,10.96,43.26,-0.7466481738326398
ATRA,10.42,41.08,-0.74634858812074
DMRC,17.04,65.42,-0.7395291959645368
GGAL,10.21,38.24,-0.7330020920502092


In [None]:
org.apache.spark.sql.functions.min

In [16]:
val summary2019DF = spark.table(tableName)
    .filter($"date" >= 20190101)
    .filter($"date" > 20190101)
    .groupBy("ticker")
    .agg(
        org.apache.spark.sql.functions.min($"open") as "min_2019",
        org.apache.spark.sql.functions.max($"open") as "max_2019",
        org.apache.spark.sql.functions.avg($"open") as "avg_2019"
    )
    .cache()



summary2019DF = [ticker: string, min_2019: double ... 2 more fields]


[ticker: string, min_2019: double ... 2 more fields]

In [44]:
val FAANG = List("FB", "AAPL", "AMZN", "NFLX", "GOOG", "MSFT")

FAANG = List(FB, AAPL, AMZN, NFLX, GOOG, MSFT)


List(FB, AAPL, AMZN, NFLX, GOOG, MSFT)

percentageGrowthUDF: org.apache.spark.sql.expressions.UserDefinedFunction


In [45]:
summary2019DF
    .filter($"ticker".isin(FAANG:_*))
    .display

Name: Compile Error
Message: <console>:56: error: not found: value summary2019DF
       summary2019DF
       ^

StackTrace: 

In [21]:
val fatDF = withNamesDF.join(summary2019DF, summary2019DF("ticker") === withNamesDF("ticker"))
    .drop(summary2019DF("ticker"))

fatDF = [ticker: string, end_of_period: double ... 6 more fields]


[ticker: string, end_of_period: double ... 6 more fields]

In [46]:
fatDF
    .filter($"end_of_period" > 10.0)
    .withColumn("diff_percentage", $"diff_percentage" * 100.0)
    .orderBy($"diff_percentage".desc).display

[Stage 221:>                                                        (0 + 2) / 2]

ticker,end_of_period,start_of_period,diff_percentage,name,min_2019,max_2019,avg_2019
SRRA,15.65,0.355,4308.450704225352,Sierra Oncology Inc,0.22,2.0,0.9052434615384616
BLPH,13.5,0.36,3650.0,Bellerophon Ther Com,0.33,1.09,0.6068000000000001
UONE,40.75,1.81,2151.381215469613,Urban One Inc,1.72,2.76,2.1226619230769237
CODX,15.95,0.9,1672.2222222222222,Co-Diagnostics Inc,0.7351,3.24,1.0729103846153842
NVAX,63.28,3.99,1485.9649122807018,Novavax Inc,3.7,46.8,12.280407692307696
AHPI,10.68,1.22,775.4098360655737,Allied Healthcare,0.92,2.1,1.577083076923077
GRPN,21.02,2.42,768.5950413223142,Groupon Cl A,2.25,3.79,3.1845
AIRTP,20.9499,2.48,744.7540322580645,Air T Inc Funding Alpha Income Trust Preferred,2.08,3.16,2.4508157534246577
LTRPB,51.85,7.25,615.1724137931035,Liberty Tripadv B,6.739,18.93,12.439446486486483
VERI,17.08,2.53,575.098814229249,Veritone Inc,2.37,9.64,5.411423076923077


In [36]:
fatDF
    .filter($"end_of_period" > 10.0)
    .withColumn("diff_percentage", $"diff_percentage" * 100.0)
    .withColumn("2019_diff_percentage", percentageGrowthUDF($"min_2019", $"end_of_period"))
    .withColumn("2019_growth", percentageGrowthUDF($"min_2019", $"max_2019"))
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage,name,min_2019,max_2019,avg_2019,2019_diff_percentage,2019_growth
RRGB,12.12,33.08,-63.3615477629988,Red Robin Gourmet Burgers Inc,25.51,35.54,30.97219230769231,-52.489219913759314,39.317914543316334
MTSC,17.86,48.33,-63.04572729153735,M T S Systems Cp,39.67,60.75,54.12365384615384,-54.978573229140416,53.13839173178724
CUTR,13.57,36.18,-62.4930901050304,Cutera Inc,12.42,38.94,23.776884615384624,9.25925925925926,213.52657004830917
ATRO,10.57,28.15,-62.45115452930728,Astronics Cp,26.06,43.78,32.85825,-59.43975441289332,67.99693016116655
ZIV,28.5,73.03,-60.97494180473778,VS -1X VIX Mid Term,59.9,76.98,70.72547576923075,-52.42070116861436,28.514190317195336
NBLX,10.45,26.69,-60.84675908579993,Noble Midstream Partners LP,26.45,27.8,27.125,-60.49149338374291,5.103969754253314
GIII,13.97,34.37,-59.35408786732616,G-III Apparel Gp,18.53,43.63,30.524153846153848,-24.608742579600648,135.45601726929303
SOHON,10.75,26.114,-58.83434173240407,Sotherly Hotels Inc Perp Pfd Ser D,25.1,26.98,25.835902222222234,-57.17131474103586,7.490039840637445
WLFC,24.81,59.46,-58.2744702320888,Willis Lease Fin C,34.44,71.37,52.419380384615394,-27.961672473867598,107.22996515679446
SOHOO,10.76,25.7501,-58.21375450969123,Sotherly Hotels LP,22.15,26.3,25.070528461538466,-51.42212189616253,18.73589164785554


### How about looking at companies that had a positive growth in 2019, 2018 and are below pre-pandemic numbers. 

In [25]:
val compareWithLowDF = getComparisionDF(week52Low, readDF(20200604))

compareWithLowDF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [26]:
compareWithLowDF
    .filter($"end_of_period" > 10.0)
    .filter($"ticker" === "MSFT")
    .orderBy($"diff_percentage".asc).display



ticker,end_of_period,start_of_period,diff_percentage
MSFT,184.3,99.55,0.8513309894525366


In [27]:
compareWithLowDF
    .filter($"end_of_period" > 10.0)
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage
SOHOB,10.2,24.03,-0.5755305867665419
PLAY,16.5,37.28,-0.5574034334763949
ATRO,11.81,26.06,-0.5468150422102839
UAL,35.77,77.38,-0.5377358490566037
SOHOO,10.38,22.15,-0.5313769751693002
PVAC,11.66,23.96,-0.5133555926544241
MTSC,19.44,39.67,-0.5099571464582808
VNOM,11.35,22.33,-0.4917151813703537
AAL,12.85,24.41,-0.4735764031134781
ZIV,31.75,59.9,-0.4699499165275459


In [8]:
compareWithLowDF.filter($"ticker".isin("ROKU", "MSFT", "FB", "NFLX")).display

ticker,end_of_period,start_of_period,diff_percentage
MSFT,184.3,99.55,0.8513309894525366
ROKU,104.0,29.82,2.4875922199865865
FB,229.56,128.99,0.7796728428560352
NFLX,422.39,255.71,0.6518321536115129


In [9]:
compareWithMaxDF.filter($"ticker".isin("ROKU", "MSFT", "FB", "NFLX")).display

ticker,end_of_period,start_of_period,diff_percentage
MSFT,184.3,159.45,0.1558482282847289
ROKU,104.0,173.6,-0.4009216589861751
FB,229.56,208.6697,0.1001118034865627
NFLX,422.39,382.77,0.1035086344279855


In [10]:
val week52Avg = spark.table(tableName)
    .filter($"date" < 20200101)
    .filter($"date" > 20190101)
    .groupBy("ticker")
    .agg(avg($"open") as "open")
    .cache()

week52Avg = [ticker: string, open: double]


[ticker: string, open: double]

In [11]:
week52Avg.filter($"ticker".isin("ROKU", "MSFT", "FB", "NFLX")).display



ticker,open
MSFT,130.36676923076922
ROKU,98.66103846153842
FB,181.5999796153846
NFLX,329.04549999999995


In [12]:
val compareWithAvgDF = getComparisionDF(week52Avg, readDF(20200604))

compareWithAvgDF = [ticker: string, end_of_period: double ... 2 more fields]


[ticker: string, end_of_period: double ... 2 more fields]

In [16]:
compareWithAvgDF
    .filter($"end_of_period" > 10.0)
    .withColumn("diff_percentage", $"diff_percentage" * 100.0)
    .orderBy($"diff_percentage".asc).display

ticker,end_of_period,start_of_period,diff_percentage
TLRY,10.13,45.14019230769232,-77.55880185234888
SAGE,35.92,151.34496153846152,-76.2661408514273
PVAC,11.66,37.05984615384615,-68.53737613589661
ANAB,18.94,52.86623076923079,-64.17372730301881
MTSC,19.44,54.12365384615384,-64.08224755989666
ATRO,11.81,32.85825,-64.05773283726307
PLAY,16.5,45.02559615384615,-63.35417760239794
VNOM,11.35,29.440192307692303,-61.44726270339476
SOHOB,10.2,25.382697307692304,-59.81514542621576
WLFC,21.16,52.419380384615366,-59.63325044145261


In [28]:
import com.netflix.boson.util.DateInt
val d = DateInt(20200201)

d = com.netflix.boson.util.DateInt@1343b09


20200201

In [29]:
d.toLocalDate.getDayOfWeek

SATURDAY

In [1]:
import java.util.concurrent.TimeUnit

In [2]:
TimeUnit.DAYS.toSeconds(365)

31536000

In [3]:
val m = Map(1 -> 2, 3 -> 4)

m = Map(1 -> 2, 3 -> 4)


Map(1 -> 2, 3 -> 4)

In [7]:
m.values.sum < 1.0

false

In [1]:
List(1, 2, 3).toSet == List(3, 2, 1).toSet

true

In [1]:
var i:Int = 40

i = 40


40

In [3]:
val i1: Char = i.asInstanceOf[Char]

i1 = (


lastException: Throwable = null


(

In [4]:
i1

(