In [1]:
### SPARK imports

from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql import Window

from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler


In [2]:
### Regular python imports

import getpass

In [3]:
#Set the paths here!

user = getpass.getuser()
if user == "sidsel":
    parquet = "/home/"+user+"/workspace/sparkdata/parquet/"
    regnskabsData = "alleregnskaber.parquet"
    cvrData = "virkdata.parquet"
    csvpath = "/home/"+user+'/workspace/sparkdata/csv'

elif user == "svanhmic":
    cvrParquetPath = "/home/"+user+"/workspace/Python/Erhvervs/data/cdata/parquet/"
    regnskabParquetPath = "/home/"+user+"/workspace/Python/Erhvervs/data/regnskabsdata/sparkdata/parquet/" 
    csvpath = "/home/"+user+'/workspace/Python/Erhvervs/data/regnskabsdata'

In [4]:
zeroYearWindow = Window.partitionBy("cvr")
laggValueWindow = Window.partitionBy("cvr").orderBy("End_Instant")

In [20]:
#import data and do transformations

iktBrancheDf = sqlContext.read.parquet(cvrParquetPath+"IktBrancher.parquet").cache()
notIktGazelleDf = sqlContext.read.parquet(cvrParquetPath+"IktNotGazeller.parquet")
iktGazelleDf = sqlContext.read.parquet(cvrParquetPath+"IktGazeller.parquet")





notIktGazelleDf = (notIktGazelleDf
                   .drop("rank")
                   .withColumn("label",F.col("GrossResult").cast("double"))
                   .withColumn("Revenue",F.col("Revenue").cast("double"))
                   
                   .withColumn("LaggedGrossResult",F.col("GrossResult")-F.lag(F.col("GrossResult")).over(laggValueWindow))
                   .filter(F.col("LaggedGrossResult") >= 0.0)
                   .withColumn("FirstRevenue",F.when(F.col("Revenue") >= 1000000,True).otherwise(False))
                   .withColumn("FirstGrossResult",F.when(F.col("GrossResult") >= 500000,True).otherwise(False))
                   .filter((F.col("FirstGrossResult")))
                   .withColumn("Year", F.rank().over(laggValueWindow).cast("double")-1)
                   .withColumn("maxRank",F.max("Year").over(zeroYearWindow).cast("double"))
                   .filter(F.col("maxRank") >= 3) # re want to predict for at least four values
                   .drop("maxRank")
                   .filter(F.col("cvr") == 10008123)
                   .cache()
               )

notIktGazelleDf.orderBy("cvr","rank").show()
#notIktGazelleDf.printSchema()

+--------+-----------+-------+-----------+-----------+---------------+-------+-----------------+--------------------+---------------------+-----------------+--------+-----------------+------------+----------------+----+
|     cvr|branchekode|unitRef|End_Instant|GrossResult|GrossProfitLoss|Revenue|     branchetekst|         Branchenavn|Standardgruppering_19|      IKT-områder|   label|LaggedGrossResult|FirstRevenue|FirstGrossResult|Year|
+--------+-----------+-------+-----------+-----------+---------------+-------+-----------------+--------------------+---------------------+-----------------+--------+-----------------+------------+----------------+----+
|10008123|     611000|    DKK| 2012-12-31|   2.8072E7|            0.0|    0.0|Telekommunikation|Fastnetbaseret te...| Information og ko...|Telekommunikation|2.8072E7|              0.0|       false|            true| 0.0|
|10008123|     611000|    DKK| 2012-12-31|   2.8072E7|       2.8073E7|    0.0|Telekommunikation|Fastnetbaseret te...| In

In [32]:
#Build the pipeline here!


vecAss = VectorAssembler(inputCols=["Year"],outputCol="features")
lg = LinearRegression(labelCol="label")
pipe = Pipeline(stages=[vecAss,lg])


In [33]:
paramGrid = (ParamGridBuilder()
             .baseOn({lg.maxIter:10})
             
             .baseOn({lg.featuresCol:vecAss.getOutputCol()})     
             .addGrid(lg.elasticNetParam, [0.5,0.1])
             .addGrid(lg.regParam,[0.1,0.01])
             .build())


crossval = CrossValidator(estimator=pipe,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

In [39]:
def executeLinearRegressionsYearVar(df,targetCol,crossval,maxYear=6):
    """
        this method should do linear regression for 
        each of the companies or another column that we can chose. 
    """
    
    #Extract unqiue cvr-numbers
    uniqueTargetValues = (df
                          .select(targetCol)
                          .distinct()
                          .rdd
                          .map(lambda x: x[0])
                          .collect()
                         )
    
    modelsPrCvr = {} # add all models with cvr numbers in this one.
    
    yearDf = sqlContext.range(1,maxYear).withColumnRenamed("id","Year").cache()
    
    #Run through all cvr-numbers
    for company in uniqueTargetValues:
        try:
            tempDf = df.filter(F.col(targetCol) == company)
            #tempDf.show(5)
            if tempDf.count() < 3:
                print("this should not happen: "+str(tempDf.count()))
            else:
                model = crossval.fit(tempDf) # create model             
                
                modelsPrCvr[company] = (model)
        except:
            tempDf.show()
            #tempDf.printSchema()
            #break

    
    return modelsPrCvr
    
def evaluateAllModels(dict,evaluator):
    """
        this method should do linear regression for 
        each of the companies or another column that we can chose. 
    """
    
    
    yearDf = sqlContext.range(1,maxYear).withColumnRenamed("id","Year").cache()
    
    for idx,val in dict.items():
        evaluator = RegressionEvaluator(labelCol="label")
        val.transform(yearDf)
        evaluator.evaluate("")
    return 
    

In [35]:
print("entire IKT: " + str(iktBrancheDf.count()))
print("gazels in IKT: " + str(iktGazelleDf.count()))
print("Non-gazels in IKT: " + str(notIktGazelleDf.count()))

entire IKT: 41410
gazels in IKT: 383
Non-gazels in IKT: 7


In [36]:
#notIktGazelleDf.show()

In [37]:
# Run cross-validation, and choose the best set of parameters.
allModels = executeLinearRegressionsYearVar(notIktGazelleDf,"cvr",pipe)

In [38]:
allModels[10008123][1].show()

+----+--------+--------------------+
|Year|features|          prediction|
+----+--------+--------------------+
|   1|   [1.0]|2.9647951923076916E7|
|   2|   [2.0]|3.1290346153846152E7|
|   3|   [3.0]|3.2932740384615384E7|
|   4|   [4.0]| 3.457513461538462E7|
|   5|   [5.0]|3.6217528846153855E7|
+----+--------+--------------------+

