In [73]:
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.linear_model import LogisticRegression

In [74]:
# 构造一些数据点
centers = [[-5, 0], [0, 1.5]]
X, y = make_blobs(n_samples=1000, centers=centers, random_state=40)
transformation = [[0.4, 0.2], [-0.4, 1.2]]
X = np.dot(X, transformation)
X

array([[-2.38519938,  0.02641654],
       [-1.74830343, -1.4226907 ],
       [-2.45431681, -1.46071435],
       ...,
       [-1.88691344, -1.67526794],
       [-0.96714666,  4.62965124],
       [-1.78872907,  3.53964501]])

In [75]:
import pandas as pd
df_X = pd.DataFrame(X)
df_X.head()

Unnamed: 0,0,1
0,-2.385199,0.026417
1,-1.748303,-1.422691
2,-2.454317,-1.460714
3,-1.125613,1.839601
4,-1.88564,0.015078


In [76]:
y
df_y = pd.DataFrame(y)
df_y.head()

Unnamed: 0,0
0,0
1,0
2,0
3,1
4,0


In [77]:
df=pd.concat([df_X,df_y],axis=1)
df.columns = ['x1','x2','y']
df.head()

Unnamed: 0,x1,x2,y
0,-2.385199,0.026417,0
1,-1.748303,-1.422691,0
2,-2.454317,-1.460714,0
3,-1.125613,1.839601,1
4,-1.88564,0.015078,0


In [78]:
df.shape

(1000, 3)

In [94]:
# -*- coding: utf-8 -*-
from sklearn.datasets import load_iris
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

# GBDT算法
# from sklearn.ensemble import GradientBoostingClassifier

# 随机拆分训练集与测试集
# train_x, test_x, train_y, test_y = train_test_split(df.iloc[:, :2], df.iloc[:, 2], test_size = 0.2)
x, y = df.iloc[:, :2], df.iloc[:, 2]
# 逻辑回归分类算法
lr = LogisticRegression()

# 训练模型
# lr.fit(train_x, train_y)
lr.fit(x, y)

# 预测
# predict_y = lr.predict(test_x)
# print(predict_y)

# 模型得分
score = lr.score(x, y)
print(score)
# sklearn —— lr的intercept
print('intercept:', lr.intercept_)
print('coef', lr.coef_)

0.994
intercept: [5.26633759]
coef [[4.83543236 2.40770116]]


In [95]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from sklearn.datasets import load_iris
import pandas
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import  VectorAssembler
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.feature import RFormula

spark = SparkSession \
    .builder \
    .appName("LogisticRegressionSummary") \
    .getOrCreate()

In [96]:
spark_df = spark.createDataFrame(df)
spark_df.show(5)

+-------------------+--------------------+---+
|                 x1|                  x2|  y|
+-------------------+--------------------+---+
| -2.385199377182053| 0.02641654153650963|  0|
| -1.748303425101968| -1.4226907036418683|  0|
|-2.4543168092113743| -1.4607143493286543|  0|
|-1.1256129399722343|  1.8396010662311215|  1|
|-1.8856403930555201|0.015077984693015356|  0|
+-------------------+--------------------+---+
only showing top 5 rows



In [97]:
spark_df.describe(['x1','x2']).show()

+-------+-------------------+------------------+
|summary|                 x1|                x2|
+-------+-------------------+------------------+
|  count|               1000|              1000|
|   mean|-1.3192309800110293|0.4906036582218552|
| stddev| 0.8709577892170187|1.8635558731525261|
|    min|-4.0920421418646775| -4.88341015185696|
|    max| 1.2690910070542518| 5.406472714716774|
+-------+-------------------+------------------+



In [98]:
# 提取特征与目标
fomula = RFormula(formula = 'y ~ .')
raw_df = fomula.fit(spark_df).transform(spark_df)
raw_df.show(5)

+-------------------+--------------------+---+--------------------+-----+
|                 x1|                  x2|  y|            features|label|
+-------------------+--------------------+---+--------------------+-----+
| -2.385199377182053| 0.02641654153650963|  0|[-2.3851993771820...|  0.0|
| -1.748303425101968| -1.4226907036418683|  0|[-1.7483034251019...|  0.0|
|-2.4543168092113743| -1.4607143493286543|  0|[-2.4543168092113...|  0.0|
|-1.1256129399722343|  1.8396010662311215|  1|[-1.1256129399722...|  1.0|
|-1.8856403930555201|0.015077984693015356|  0|[-1.8856403930555...|  0.0|
+-------------------+--------------------+---+--------------------+-----+
only showing top 5 rows



In [99]:
# 拆分训练集和测试集
# train_df, test_df = raw_df.randomSplit([0.8, 0.2])

# 创建LR分类器
lr = LogisticRegression()

# 训练
# train_df.show(5)

In [100]:
# model = lr.fit(train_df)
model = lr.fit(raw_df)

In [101]:
# 预测test集合
# predict_df = model.transform(test_df)
predict_df = model.transform(raw_df)
predict_df.show(5)

+-------------------+--------------------+---+--------------------+-----+--------------------+--------------------+----------+
|                 x1|                  x2|  y|            features|label|       rawPrediction|         probability|prediction|
+-------------------+--------------------+---+--------------------+-----+--------------------+--------------------+----------+
| -2.385199377182053| 0.02641654153650963|  0|[-2.3851993771820...|  0.0|[12.5191484349171...|[0.99999634404089...|       0.0|
| -1.748303425101968| -1.4226907036418683|  0|[-1.7483034251019...|  0.0|[12.4363591611965...|[0.99999602848582...|       0.0|
|-2.4543168092113743| -1.4607143493286543|  0|[-2.4543168092113...|  0.0|[19.8131630172131...|[0.99999999751542...|       0.0|
|-1.1256129399722343|  1.8396010662311215|  1|[-1.1256129399722...|  1.0|[-8.3739689146985...|[2.30744461788483...|       1.0|
|-1.8856403930555201|0.015077984693015356|  0|[-1.8856403930555...|  0.0|[7.46891633132487...|[0.99942977909231

In [102]:
predict_df = model.transform(raw_df)
predict_df.show(5)

+-------------------+--------------------+---+--------------------+-----+--------------------+--------------------+----------+
|                 x1|                  x2|  y|            features|label|       rawPrediction|         probability|prediction|
+-------------------+--------------------+---+--------------------+-----+--------------------+--------------------+----------+
| -2.385199377182053| 0.02641654153650963|  0|[-2.3851993771820...|  0.0|[12.5191484349171...|[0.99999634404089...|       0.0|
| -1.748303425101968| -1.4226907036418683|  0|[-1.7483034251019...|  0.0|[12.4363591611965...|[0.99999602848582...|       0.0|
|-2.4543168092113743| -1.4607143493286543|  0|[-2.4543168092113...|  0.0|[19.8131630172131...|[0.99999999751542...|       0.0|
|-1.1256129399722343|  1.8396010662311215|  1|[-1.1256129399722...|  1.0|[-8.3739689146985...|[2.30744461788483...|       1.0|
|-1.8856403930555201|0.015077984693015356|  0|[-1.8856403930555...|  0.0|[7.46891633132487...|[0.99942977909231

In [103]:
predict_df.select('probability').toPandas().head()

Unnamed: 0,probability
0,"[0.9999963440408975, 3.655959102346e-06]"
1,"[0.9999960284858289, 3.971514170951432e-06]"
2,"[0.9999999975154221, 2.4845779777244565e-09]"
3,"[0.00023074446178848393, 0.9997692555382116]"
4,"[0.9994297790923127, 0.0005702209076873891]"


In [104]:
wxb = -(10.209934496107765*(-1.1256129399722343) + 
 4.430228817925033*1.8396010662311215 +  
 11.71654964268366)
big = 2.718281828459 ** wxb
big / (1+big)

0.00023074446178851627

In [105]:
# 对测试集做predict, 生成(预测分类, 正确分类)
def build_predict_target(row):
    return (float(row.prediction), float(row.y))

predict_and_target_rdd = predict_df.rdd.map(build_predict_target)

In [106]:
# 统计模型效果
metrics = BinaryClassificationMetrics(predict_and_target_rdd)
print(metrics.areaUnderPR)

0.9930239520958084


In [107]:
# spark的系数与截距
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

Coefficients: [10.209934496107765,4.430228817925033]
Intercept: 11.71654964268366


In [93]:
list(model.coefficients)
# model.intercept

[10.209934496107765, 4.430228817925033]

In [72]:
test_df = test_df.drop()

In [None]:
weight = pd.DataFrame(lr.coef_, columns=X.columns)
weight = pd.concat([weight, pd.DataFrame(lr.intercept_, columns=['intercept'])], axis=1)

# 增加id列是为了解决权重后期相乘计算问题
weight = pd.concat([weight, pd.DataFrame(np.array([1]), columns=['id'])], axis=1)


In [31]:
train_df.columns

['x1', 'x2', 'y', 'features', 'label']

In [None]:
wxb = -(10.209934496107765*(-2.385199377182053) + 
 4.430228817925033*0.02641654153650963 +  
 11.71654964268366)
big = 2.718281828459 ** wxb
big / (1+big)

In [None]:
w_list = [-0.7816175677294058,0.5477111013627193,7.6443351623435545,-0.5748801759905937,-0.42027007731042154,0.29231073398711066,4.272040352801078,-0.34739586823420554,0.2145981403766586,-0.40415069197636677,0.12383300125220871,0.34573268366153476,0.06080121567168719,-0.3450963962713452,0.11033544831366296,0.30794444793466413,0.5375354678582245,0.04823501613857324,0.11626184143697531,0.13765776209775113,-0.13334708891100816,-0.011552788830795174,0.38278599210120395,-2.615143604484861,-0.6625679839416263,0.08666411747007477,-0.8133271454491996]
Intercept = -3.1160763612649744
len(w_list)

In [None]:
import pandas as pd 
data = pd.read_csv("parameterVerificationLR.csv")
data.head()

In [None]:
first_line = data.loc[1,:][1:-5].values.tolist()
len(first_line)
molecule =2.718281828459 ** (-(sum([first_line[i] * w_list[i] for i in range(len(first_line))])+Intercept))
molecule / (1 + molecule)

In [None]:
from __future__ import print_function 
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler

spark=SparkSession.builder.appName('StandScalerExample').getOrCreate()
dataFrame=spark.createDataFrame([(0.0,Vectors.dense([1.0,0.1,-8.0]),),
                                 (1.0,Vectors.dense([2.0,1.0,-4.0]),),
                                 (1.0,Vectors.dense([4.0,10.0,8.0]),)],['label','features'])
#按特征列减均值除标准差——标准化
scaler=StandardScaler(inputCol='features',outputCol='scaledFeatures',withStd=False,withMean=True)
scalerModel=scaler.fit(dataFrame)
scaledData=scalerModel.transform(dataFrame)
scaledData.show(truncate=False)

In [None]:
scaledData.select(['label', 'scaledFeatures']).show()

In [None]:
train_df.select('features').show(truncate=False)
scaler=StandardScaler(inputCol='features',outputCol='scaledFeatures',withStd=False,withMean=True)
scalerModel=scaler.fit(train_df)
scaledData=scalerModel.transform(train_df)
scaledData.show(truncate=False)

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
dataFrame.show()
df = dataFrame
def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)
column = ['a', 'b', 'c']
df = df.withColumn("xs", to_array(col("features"))) \
            .select(["label"] + [col("xs")[i] for i in range(3)])
df.show()

In [None]:
dataFrame.show()
s = dataFrame.drop('label')
s.show()

In [None]:
import pyspark.sql.functions as fn

sd = df.withColumn("id", fn.monotonically_increasing_id())
sd.show()

In [None]:
sd.select(['label', 'id']).show()

In [None]:
sd.show()
sd = sd.drop('label', 'xs[0]')
sd.show()

In [None]:
output.show()

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
df = output.select(['features', 'label'])
def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)
df = df.withColumn("xs", to_array(col("features"))) \
            .select(["label"] + [col("xs")[i] for i in range(3)])
df.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MaxAbsScaler

def columnProcessing(data,columnLabel,allColumn):
    
    #删除目标列
    allColumn.remove(columnLabel)
    
    assembler = VectorAssembler(
      inputCols=[columnLabel], outputCol="features"
    )

    data = assembler.transform(data)
#     scaler=MaxAbsScaler(inputCol='features',outputCol='scaledFeatures').fit(data)
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True).fit(data)
    data = scaler.transform(data)

    
    def to_array(col):
        def to_array_(v):
            return v.toArray().tolist()
        return udf(to_array_, ArrayType(DoubleType()))(col)
    data = data \
                .withColumn(columnLabel, to_array(col("scaledFeatures"))) \
                .select(allColumn + [col(columnLabel)[0] for i in range(1)]) \
                .withColumnRenamed('%s[0]'%columnLabel,columnLabel)
    #           .select(["xs[0]", "xs[1]", "xs[2]"] + [col("label")[i] for i in range(1)])

    return data
for i in df.columns:
    if i == 'label':
        continue
    df = columnProcessing(df, i,df.columns)
df.show()

In [None]:
def recommed_index_visit_company_sku_class1_cross_feature(self):
        '''
        用户与曝光的交叉特征
        '''
        data = self.spark.sql('''
                    select * from search_data.recommed_index_visit_company_sku_class1_cross_feature
                ''')
        return data
        
        # 删除包含空行的值
        data = data.na.drop()

        return data