In [2]:
import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import re
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import RFormula
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
# import spark.implicits._


sparkconf = SparkConf().setAppName('Mernis')
sparkconf.set('spark.executor.memory','10g')
sparkconf.set('spark.driver.memory','10g')
sparkconf.set("spark.sql.debug.maxToStringFields", "100")
spark = (SparkSession
         .builder
         .appName("Mernis")
         .config(conf=sparkconf)
         .getOrCreate())

# sc = SparkContext.getOrCreate()
# 加载数据
file_path = '/root/myfile/mernis/data_dump.sql'

data = spark.sparkContext.textFile(file_path). \
    filter((lambda line: re.findall('^\d{6}', line))). \
    map(lambda line: line.split('\t')[:-1])

schema = "uid STRING, national_identifier STRING, first STRING, last STRING, mother_first STRING, " \
         "father_first STRING, gender STRING, birth_city STRING, date_of_birth STRING, " \
         "id_registration_city STRING, id_registration_district STRING, address_city STRING, " \
         "address_district STRING, address_neighborhood STRING,street_address STRING, " \
         "door_or_entrance_number STRING"

df = spark.createDataFrame(data, schema)
# total_count = df.count()  # total_count = 49611709
# print("The total number:", total_count)


def format_date(line):
    li = line.split('/')
    if len(li[2]) == 4 and 0 < len(li[1]) <= 2 and 0 < len(li[1]) <= 2:
        return li[2] + '-' + li[1].zfill(2) + '-' + li[0].zfill(2)
    else:
        return 'null'


format_date_udf = udf(format_date, returnType=StringType())

df.createOrReplaceTempView('citizens')
df_format_date = df.withColumn("date_of_birth", format_date_udf(df["date_of_birth"]))
df_format_date = df_format_date.filter(expr("""date_of_birth != 'null'"""))
df_format_date = df_format_date.withColumn('date_of_birth', to_date('date_of_birth')).\
    withColumn('month_of_birth', month('date_of_birth')).\
    withColumn('year_of_birth', year('date_of_birth'))
df_format_date.show(3)

# df_age = df_format_date.withColumn('age', (round(months_between(
#         to_date(lit('2009-12-31')), col('date_of_birth')) / 12, 0)).cast('float'))
# df_age.show(10)

+------+-------------------+--------+--------+------------+------------+------+----------+-------------+--------------------+------------------------+------------+----------------+--------------------+----------------+-----------------------+--------------+-------------+
|   uid|national_identifier|   first|    last|mother_first|father_first|gender|birth_city|date_of_birth|id_registration_city|id_registration_district|address_city|address_district|address_neighborhood|  street_address|door_or_entrance_number|month_of_birth|year_of_birth|
+------+-------------------+--------+--------+------------+------------+------+----------+-------------+--------------------+------------------------+------------+----------------+--------------------+----------------+-----------------------+--------------+-------------+
|291990|        23480340824|NESLIHAN|  ZENGIN|      ZEYCAN|       OSMAN|     K|    KANGAL|   1978-06-10|             MALATYA|                KULUNCAK|     MALATYA|        KULUNCAK|    

In [38]:
# # N6

# The top10 city with most citizens
df_n6 = df_format_date.\
    select('address_city').\
    groupBy('address_city').\
    agg(count('*').alias('total')).\
    orderBy('total',ascending=False).\
    limit(10)

sc = SparkContext.getOrCreate()
area = [('ADANA',14030),('ISTANBUL',5343),('BURSA',10891),('IZMIR',7340),('AYDIN',8007),\
       ('ANKARA',30715),('ANTALYA',1417),('KOCAELI',3418),('KONYA',38257),('MERSIN',15737)]

df_area = spark.createDataFrame(area,['address_city','area'])
df_area = df_n6.join(df_area,'address_city','left_outer').orderBy('area')
df_area.show(10)
density_df = df_area.withColumn('desity',round(df_area['total'] / df_area['area'],2) )
density_df.show(10)


+------------+-------+-----+-------+
|address_city|  total| area| desity|
+------------+-------+-----+-------+
|     ANTALYA|1288425| 1417| 909.26|
|     KOCAELI|1017832| 3418| 297.79|
|    ISTANBUL|8821021| 5343|1650.95|
|       IZMIR|2787611| 7340| 379.78|
|       AYDIN|1410156| 8007| 176.12|
|       BURSA|1782462|10891| 163.66|
|       ADANA|1388836|14030|  98.99|
|      MERSIN|1097789|15737|  69.76|
|      ANKARA|3077939|30715| 100.21|
|       KONYA|1330357|38257|  34.77|
+------------+-------+-----+-------+



In [46]:
total_num = 49611709
df_n7_district = df_format_date.\
    select('id_registration_district','address_district').\
    filter(col('id_registration_district')!=col('address_district'))
print(type(df_n7_district.count()))
propor_district = df_n7_district.count() / total_num
print('Proportion of cross-district floating population:%.3f'%propor_district)

df_n7_city = df_format_date.\
    select('id_registration_city','address_city').\
    filter(col('id_registration_city')!=col('address_city'))
propor_city = df_n7_city.count() / total_num
print('Proportion of cross-city floating population:%.3f'%propor_city)

Proportion of cross-district floating population:0.523
Proportion of cross-city floating population:0.361


将出生日期中的年和月提取出来构成新的列,'year_of_birth'和'month_of_birth'，以便于转换成特征。由于总的数据量过大，从中抽取出4900余份样本进行训练和预测。

In [2]:
df_h1 = df_format_date.sample(False, 0.00005, seed = 2018)
df_h1.show(10)
df_h1 = df_h1.dropna()
print(df_h1.count())
feature_col = ['first','last','mother_first','father_first','gender','birth_city',
               'month_of_birth','year_of_birth','id_registration_city', 'id_registration_district',
               'address_district','address_neighborhood', 'street_address','address_city'
               ]

indexOutputCols = [x + '_Index' for x in feature_col]
oheOutputCols = [x + '_OHE' for x in feature_col]
stringIndexer_features = StringIndexer(inputCols=feature_col,outputCols=indexOutputCols,
                              handleInvalid="skip")
oheEncoder_features = OneHotEncoder(inputCols=indexOutputCols,outputCols=oheOutputCols)

# stringIndexer_labels = StringIndexer(inputCols=['address_city'],outputCols=['address_city_Index'])
# oheEncoder_labels = OneHotEncoder(inputCols=['address_city_Index'],outputCols=['address_city_OHE'])


pipeline = Pipeline(stages=[stringIndexer_features,oheEncoder_features])
model = pipeline.fit(df_h1)
res = model.transform(df_h1)

# Split the dataset into training, validation and test set with prob 0.7,0.2 and 0.1.
(trainingData, validData, testData) = res.randomSplit([0.7, 0.2,0.1], seed=100)
# trainingData.persist()
# validData.persist()
# testData.persist()
# print('Training Dataset Count:{}'.format(trainingData.count()))
# print('Vaildation Dataset Count:{}'.format(validData.count()))
# print('Test Dataset Count:{}'.format(testData.count()))




+------+-------------------+-----------------+--------+------------+------------+------+----------+-------------+--------------------+------------------------+------------+----------------+--------------------+--------------------+-----------------------+--------------+-------------+
|   uid|national_identifier|            first|    last|mother_first|father_first|gender|birth_city|date_of_birth|id_registration_city|id_registration_district|address_city|address_district|address_neighborhood|      street_address|door_or_entrance_number|month_of_birth|year_of_birth|
+------+-------------------+-----------------+--------+------------+------------+------+----------+-------------+--------------------+------------------------+------------+----------------+--------------------+--------------------+-----------------------+--------------+-------------+
|301613|        19211135418|            ERKAN|  BOZBAY|      CIGDEM|     MUSTAFA|     E| YESILYURT|   1980-04-14|             MALATYA|       YESI

DataFrame[uid: string, national_identifier: string, first: string, last: string, mother_first: string, father_first: string, gender: string, birth_city: string, date_of_birth: date, id_registration_city: string, id_registration_district: string, address_city: string, address_district: string, address_neighborhood: string, street_address: string, door_or_entrance_number: string, month_of_birth: int, year_of_birth: int, gender_Index: double, birth_city_Index: double, address_neighborhood_Index: double, last_Index: double, address_city_Index: double, id_registration_city_Index: double, month_of_birth_Index: double, father_first_Index: double, address_district_Index: double, first_Index: double, street_address_Index: double, year_of_birth_Index: double, mother_first_Index: double, id_registration_district_Index: double, address_city_OHE: vector, id_registration_district_OHE: vector, month_of_birth_OHE: vector, birth_city_OHE: vector, father_first_OHE: vector, id_registration_city_OHE: vect

In [17]:
vecAssembler = VectorAssembler(inputCols=oheOutputCols, outputCol='features')
res2 = vecAssembler.transform(res1)

In [17]:
#
# # H1. 某人所在城市的预测模型：给定一个人的所有信息（除了所在城市），预测这个人所在的城市。 分析该模型Top1到 Top

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# 增加一列labels， 保留address_city的onehot编码
feature_col = ['first','last','mother_first','father_first','gender','birth_city',
               'month_of_birth','year_of_birth','id_registration_city', 'id_registration_district',
               'address_district','address_neighborhood', 'street_address'
               ]

# All the feature columns
oheOutputCols = [x + '_OHE' for x in feature_col]


# assemble all the feature columns
vecAssembler = VectorAssembler(inputCols=oheOutputCols, outputCol='features')
df_h1 = vecAssembler.transform(trainingData)

lr = LogisticRegression(featuresCol = 'features',labelCol='address_city_Index',
                        maxIter=100, regParam=0.3, elasticNetParam=0)
lrPipeline = Pipeline(stages = [vecAssembler,lr])
lrModel = lrPipeline.fit(trainingData)

def evaluate_h1(data):
    print(lrModel)
    print(lr.getRegParam())
    vecData = vecAssembler.transform(data)
    predictions = lrModel.transform(vecData)
    predictions.\
        select('national_identifier', 'probability', 'address_city_Index','prediction').\
        orderBy('probability',ascending=False).show(n=5,truncate=30)
    
    evaluator = MulticlassClassificationEvaluator(labelCol='address_city_Index',predictionCol='prediction')
    lrAcc = evaluator.evaluate(predictions)
    print('test accuracy = ',lrAcc)

In [19]:
lr.setRegParam(0.001)
lrModel = lrPipeline.fit(trainingData)
evaluate_h1(validData)

+-------------------+------------------------------+------------------+----------+
|national_identifier|                   probability|address_city_Index|prediction|
+-------------------+------------------------------+------------------+----------+
|        49636475318|[0.9981634158524352,1.26612...|               0.0|       0.0|
|        29641925480|[0.9951736834914906,4.32659...|               0.0|       0.0|
|        44974414456|[0.9947927854059504,4.02437...|               0.0|       0.0|
|        51211204220|[0.9936831228104824,0.00122...|               0.0|       0.0|
|        52075005138|[0.9935156794485108,0.00159...|               0.0|       0.0|
+-------------------+------------------------------+------------------+----------+
only showing top 5 rows

test accuracy =  0.6278536819300573


In [22]:
lrModel.setRegParam(0.01)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(validData)

In [23]:
predictions.\
        select('national_identifier', 'probability', 'labels','prediction').\
        orderBy('probability',ascending=False).show(n=5,truncate=30)
    
evaluator = MulticlassClassificationEvaluator(labelCol='labels',predictionCol='prediction')
lrAcc= evaluator.evaluate(predictions)
print('test accuracy = ',lrAcc)

+-------------------+------------------------------+------+----------+
|national_identifier|                   probability|labels|prediction|
+-------------------+------------------------------+------+----------+
|        49636475318|[0.986710724245762,9.223238...|   0.0|       0.0|
|        29641925480|[0.9747161839881305,0.00211...|   0.0|       0.0|
|        44974414456|[0.9728858078534585,0.00206...|   0.0|       0.0|
|        52075005138|[0.9691073561203233,0.00591...|   0.0|       0.0|
|        51211204220|[0.9688285044137115,0.00479...|   0.0|       0.0|
+-------------------+------------------------------+------+----------+
only showing top 5 rows

test accuracy =  0.6049935385132912


In [None]:
evaluate_h1(testData)

In [3]:
# H2. Given all the information about one person, predict his/her gender.
feature_col = ['first','last','mother_first','father_first','birth_city', 'year_of_birth','month_of_birth',
               'id_registration_city', 'id_registration_district', 'address_city',
               'address_district','address_neighborhood', 'street_address'
               ]


# All the feature columns
oheOutputCols = [x + '_OHE' for x in feature_col]


# assemble all the feature columns
vecAssembler = VectorAssembler(inputCols=oheOutputCols, outputCol='features')
# df_h2 = vecAssembler.transform(res)

lr_h2 = LogisticRegression(featuresCol = 'features',labelCol='gender_Index',
                        maxIter=100, regParam=0.3, elasticNetParam=0)
lrPipeline_h2 = Pipeline(stages = [vecAssembler,lr_h2])
lrModel_h2 = lrPipeline_h2.fit(trainingData)

def evaluate_h2(data):
    print('RegParam=',lrModel_h2.getRegParam())
    predictions = lrModel_h2.transform(data)
    predictions.\
        select('national_identifier', 'probability','gender','gender_Index','prediction').\
        orderBy('probability',ascending=False).show(n=10,truncate=30)
    
    evaluator = MulticlassClassificationEvaluator(labelCol='gender_Index',predictionCol='prediction')
    lrAcc = evaluator.evaluate(predictions)
    print('test accuracy = ',lrAcc)

In [4]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluate_h2(validData)

PipelineModel_2800379a94fe
+-------------------+------------------------------+------+------------+----------+
|national_identifier|                   probability|gender|gender_Index|prediction|
+-------------------+------------------------------+------+------------+----------+
|        63643167882|[0.9937419609106374,0.00625...|     K|         0.0|       0.0|
|        14971091394|[0.9885102276275363,0.01148...|     K|         0.0|       0.0|
|        31871182540|[0.985789077849872,0.014210...|     K|         0.0|       0.0|
|        12026807310|[0.9844732109295973,0.01552...|     K|         0.0|       0.0|
|        19364511572|[0.9791942119328154,0.02080...|     K|         0.0|       0.0|
|        11804064914|[0.9771787586218412,0.02282...|     K|         0.0|       0.0|
|        55636492460|[0.9763369474628155,0.02366...|     K|         0.0|       0.0|
|        64681208308|[0.9759071351878916,0.02409...|     K|         0.0|       0.0|
|        15940910302|[0.9731263680462484,0.02687.

In [None]:
lr_h2.setRegParam(0.1)
lrPipeline_h2 = Pipeline(stages = [vecAssembler,lr_h2])
lrModel_h2 = lrPipeline_h2.fit(trainingData)


In [34]:
evaluate_h2(testData)

+-------------------+------------------------------+------+------------+----------+
|national_identifier|                   probability|gender|gender_Index|prediction|
+-------------------+------------------------------+------+------------+----------+
|        28066584470|[0.10917065393999323,0.8908...|     E|         1.0|       1.0|
|        10456467890|[0.9077924078473769,0.09220...|     K|         0.0|       0.0|
|        45157719504|[0.2192069146574132,0.78079...|     E|         1.0|       1.0|
|        39589937452|[0.18583600229712327,0.8141...|     E|         1.0|       1.0|
|        15161744076|[0.1957294514261375,0.80427...|     E|         1.0|       1.0|
|        23636482712|[0.06026563388594986,0.9397...|     E|         1.0|       1.0|
|        36113066848|[0.1722213335203935,0.82777...|     E|         1.0|       1.0|
|        53440491662|[0.9275128967501869,0.07248...|     K|         0.0|       0.0|
|        29069307332|[0.772681859165467,0.227318...|     E|         1.0|    

In [1]:
# H3. 姓名预测模型：假设给定一个人的所有信息（除了姓名），预测这个人最可能的姓氏。分析该 模型Top1到 Top 5的预测准确度；

feature_col = ['mother_first','father_first','birth_city','gender','year_of_birth','month_of_birth',
               'id_registration_city', 'id_registration_district', 'address_city',
               'address_district','address_neighborhood', 'street_address'
               ]

# 所有的特征列列名
oheOutputCols = [x + '_OHE' for x in feature_col]

# assemble all the feature columns
vecAssembler = VectorAssembler(inputCols=oheOutputCols, outputCol='features')
vecTrainDF_h3 = vecAssembler.transform(trainingData)
trainingData.show(3)
lr_h3 = LogisticRegression(featuresCol = 'features',labelCol='first_Index',
                        maxIter=100, regParam=0.01, elasticNetParam=0)
# lrPipeline_h3 = Pipeline(stages = [vecAssembler,lr_h3])
lrModel_h3 = lr_h3.fit(vecTrainDF_h3)

def evaluate_h3(data):
    print(lrModel_h3)
    vecData = vecAssembler.transform(data)
    predictions = lrModel_h3.transform(vecData)
    predictions.\
        select('national_identifier', 'probability', 'first','first_Index','prediction').\
        orderBy('probability',ascending=False).show(n=10,truncate=30)
    
    evaluator = MulticlassClassificationEvaluator(labelCol='first_Index',predictionCol='prediction')
    lrAcc = evaluator.evaluate(predictions)
    print('test accuracy = ',lrAcc)

In [None]:
+-------------------+----------------------------+-------+------ ---+----------+
|national_identifier |                  probability|   last|last_Index|prediction|
+-------------------+----------------------------+-------+------- --+----------+
|        34669954084|[0.0751656958984699,0.01306...|BAHADIR|     107.0|       0.0|
|        51250399116|[0.01734398137692374,0.0140...|  TANKI|     345.0|      59.0|
|        23258189716|[0.01582376988707977,0.0148...|   IRAK|     230.0|       0.0|
|        14543163580|[0.015195940098257876,0.015...|  SENER|     331.0|       1.0|
|        31243805398|[0.015019489320842194,0.015...| TURGUT|      52.0|       1.0|
|        48322738614|[0.014916421279069685,0.014...| COSKUN|     150.0|      16.0|
|        49633881822|[0.014897613581717132,0.015...|  KELES|      43.0|       1.0|
|        40459256184|[0.012225633323033705,0.043...|  PEKAK|     311.0|       1.0|
|        28873709622|[0.011274433627096122,0.015...| BAYRAM|     117.0|       1.0|
|        16295109878|[0.010925602529400297,0.015...| YENICE|     393.0|       1.0|
+-------------------+----------------------------+-------+----------+----------+
only showing top 10 rows


In [67]:
evaluate_h3(validData)

test accuracy =  0.0


In [3]:
evaluate_h3(testData)

In [5]:
 # H4. 人口预测模型：统计每一年出生的人数，预测下一年新增人口数。
df_h4 = df_format_date.withColumn('year_of_birth',year('date_of_birth'))
df_population = df_h4.\
    select("year_of_birth").\
    groupBy('year_of_birth').\
    agg(count('*').alias('total'))

df_population = df_population.withColumn('year',df_population['year_of_birth'].cast('int')).drop('year_of_birth')
df_population = df_population.filter(df_population['year'] > 1700)
df_population.orderBy('total').show(10)


+-----+----+
|total|year|
+-----+----+
|    1|1888|
|    2|1892|
|   22|1897|
|   27|1895|
|   32|1896|
|   48|1898|
|   61|1894|
|   66|1900|
|   79|1901|
|  108|1902|
+-----+----+
only showing top 10 rows



In [6]:
def to_index(year):
    return year-1888

to_index_udf = udf(to_index, returnType=IntegerType())
min_year = df_population.select(min('year').alias('year')).collect()[0]
print(min_year)
new_df = df_population.withColumn('index',to_index_udf(df_population['year']))
new_df.show()

(trianing, test) = new_df.randomSplit([0.8,0.2],seed = 2020)
trianinging.persist()
test.persist()


Row(year=1888)
+-------+----+-----+
|  total|year|index|
+-------+----+-----+
| 785662|1959|   71|
|1267087|1990|  102|
|     32|1896|    8|
|    113|1903|   15|
|1097333|1975|   87|
|1208944|1977|   89|
|      1|1888|    0|
|  82558|1924|   36|
|      2|1892|    4|
|1228686|1974|   86|
| 132554|1927|   39|
| 827701|1955|   67|
|1256035|1978|   90|
|  89810|1925|   37|
|    717|1908|   20|
| 765710|1961|   73|
| 369836|1942|   54|
| 305225|1939|   51|
| 382879|1944|   56|
|    118|1899|   11|
+-------+----+-----+
only showing top 20 rows



In [29]:
# from pyspark.ml.evaluation import RegressionEvaluator
# from pyspark.ml.regression import LinearRegression
# # from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# vecAssembler = VectorAssembler(inputCols=['index'],outputCol='features')
# vecTrainDF = vecAssembler.transform(trianing)
# lr_h4 = LinearRegression(featuresCol='features',labelCol='total')

# lrModel_h4 = lr_h4.fit(vecTrainDF)
# m = lrModel_h4.coefficients[0]
# b = lrModel_h4.intercept
print(f"""The formula for the linear regression lines is num = {m:.2f}*index{b:.2f}""")

The formula for the linear regression lines is num = 15412.37*index-326171.48


In [16]:
vecTestDF = vecAssembler.transform(test)
predictions = lrModel_h4.transform(vecTestDF)
predictions.orderBy('prediction',ascending=False).show(5)

+-------+----+-----+--------+------------------+
|  total|year|index|features|        prediction|
+-------+----+-----+--------+------------------+
|1242772|1987|   99|  [99.0]|1199653.5968508604|
|1250520|1983|   95|  [95.0]|1138004.0989916562|
| 976960|1969|   81|  [81.0]| 922230.8564844418|
|1011805|1964|   76|  [76.0]| 845168.9841604368|
| 827839|1956|   68|  [68.0]| 721869.9884420284|
+-------+----+-----+--------+------------------+
only showing top 5 rows



In [25]:
regresssionEvaluator = RegressionEvaluator(predictionCol='prediction',labelCol='total', metricName='r2')
r2 = regresssionEvaluator.evaluate(predictions)
print(f"r2 is {r2}")

r2 is 0.9323408621730506


In [24]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import FloatType
from math import log

def log_num(num):
    if num:
        return log(num)
    else:
        return 0
    
log_num_udf = udf(log_num, returnType=FloatType())
log_df = new_df.withColumn('logTotal',log_num_udf(new_df['total']))
log_df.show()



+-------+----+-----+---------+
|  total|year|index| logTotal|
+-------+----+-----+---------+
| 785662|1959|   71|13.574282|
|1267087|1990|  102|14.052231|
|     32|1896|    8| 3.465736|
|    113|1903|   15| 4.727388|
|1097333|1975|   87|13.908393|
|1208944|1977|   89|14.005258|
|      1|1888|    0|      0.0|
|  82558|1924|   36|11.321257|
|      2|1892|    4|0.6931472|
|1228686|1974|   86|14.021456|
| 132554|1927|   39|11.794745|
| 827701|1955|   67|13.626408|
|1256035|1978|   90| 14.04347|
|  89810|1925|   37|11.405452|
|    717|1908|   20|6.5750756|
| 765710|1961|   73|13.548559|
| 369836|1942|   54|12.820815|
| 305225|1939|   51|12.628804|
| 382879|1944|   56|12.855474|
|    118|1899|   11|4.7706847|
+-------+----+-----+---------+
only showing top 20 rows



In [28]:
vecAssembler = VectorAssembler(inputCols=['index'],outputCol='features')
lr_h4_log = LinearRegression(featuresCol='features',labelCol='logTotal')

training_log = trianing.withColumn('logTotal',log_num_udf('total'))
vecTrainDF_log = vecAssembler.transform(training_log)
lrModel_h4_log = lr_h4_log.fit(vecTrainDF_log)
m_log = lrModel_h4_log.coefficients[0]
b_log = lrModel_h4_log.intercept
print(f"""The formula for the linear regression lines is log(total) = {m_log:.3f}*index+{b_log.3f}""")

# test
test_log = test.withColumn('logTotal',log_num_udf('total'))
vecTestDF_log = vecAssembler.transform(test_log)
predictions_log = lrModel_h4_log.transform(vecTestDF_log)
predictions_log.orderBy('prediction',ascending=False).show(10)



regresssionEvaluator = RegressionEvaluator(predictionCol='prediction',labelCol='logTotal', metricName='r2')
r2_log = regresssionEvaluator.evaluate(predictions_log)
print(f"r2 is {r2_log}")

The formula for the linear regression lines is log(num) = 0.10648180411118631*index5.356513516286501
+-------+----+-----+---------+--------+------------------+
|  total|year|index| logTotal|features|        prediction|
+-------+----+-----+---------+--------+------------------+
|1242772|1987|   99|14.032855|  [99.0]|15.898212123293947|
|1250520|1983|   95| 14.03907|  [95.0]|  15.4722849068492|
| 976960|1969|   81|13.792201|  [81.0]|13.981539649292593|
|1011805|1964|   76|13.827247|  [76.0]|13.449130628736661|
| 827839|1956|   68|13.626574|  [68.0]| 12.59727619584717|
| 699374|1954|   66|13.457941|  [66.0]|12.384312587624798|
| 610742|1953|   65| 13.32243|  [65.0]|12.277830783513611|
| 663528|1950|   62|13.405326|  [62.0]|11.958385371180054|
| 544884|1949|   61|13.208328|  [61.0]|11.851903567068867|
| 442640|1947|   59|13.000512|  [59.0]|11.638939958846493|
+-------+----+-----+---------+--------+------------------+
only showing top 10 rows

r2 is 0.8130096202685115
