<a href="https://colab.research.google.com/github/syedmahmoodiagents/BigData/blob/main/Logistic_Regression.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# !pip install pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [None]:
from pyspark.ml.torch.distributor import TorchDistributor

In [None]:
from google.colab import files

In [None]:
sp = SparkSession.builder.appName('Practice').getOrCreate()

In [None]:
# files.upload()

In [None]:
data = [
    (0.0, 1.0, 1.0, 0.0),
    (1.0, 2.0, 1.0, 1.0),
    (0.0, 3.0, 2.0, 0.0),
    (1.0, 4.0, 2.0, 1.0),
    (0.0, 5.0, 3.0, 0.0),
    (1.0, 6.0, 3.0, 1.0)
]
columns = ["Label", "Feature1", "Feature2", "Feature3"]
df = sp.createDataFrame(data, columns)
df.show()

+-----+--------+--------+--------+
|Label|Feature1|Feature2|Feature3|
+-----+--------+--------+--------+
|  0.0|     1.0|     1.0|     0.0|
|  1.0|     2.0|     1.0|     1.0|
|  0.0|     3.0|     2.0|     0.0|
|  1.0|     4.0|     2.0|     1.0|
|  0.0|     5.0|     3.0|     0.0|
|  1.0|     6.0|     3.0|     1.0|
+-----+--------+--------+--------+



In [None]:
assembler = VectorAssembler(inputCols=["Feature1", "Feature2", "Feature3"], outputCol="features")

In [None]:
assembled_data = assembler.transform(df)

+-----+--------+--------+--------+-------------+
|Label|Feature1|Feature2|Feature3|     features|
+-----+--------+--------+--------+-------------+
|  0.0|     1.0|     1.0|     0.0|[1.0,1.0,0.0]|
|  1.0|     2.0|     1.0|     1.0|[2.0,1.0,1.0]|
|  0.0|     3.0|     2.0|     0.0|[3.0,2.0,0.0]|
|  1.0|     4.0|     2.0|     1.0|[4.0,2.0,1.0]|
|  0.0|     5.0|     3.0|     0.0|[5.0,3.0,0.0]|
|  1.0|     6.0|     3.0|     1.0|[6.0,3.0,1.0]|
+-----+--------+--------+--------+-------------+



In [None]:
lr = LogisticRegression(featuresCol='features', labelCol='Label')

In [None]:
lr_model = lr.fit(assembled_data)

In [None]:
lr_model.coefficients

DenseVector([1.6449, -3.2072, 36.2958])

In [None]:
lr_model.intercept

-17.49083298237878

In [None]:
predictions = lr_model.transform(assembled_data)

In [None]:
predictions.select("Label", "prediction", "probability").show(truncate=False)

+-----+----------+-----------------------------------------+
|Label|prediction|probability                              |
+-----+----------+-----------------------------------------+
|0.0  |0.0       |[0.9999999946868134,5.313186557387439E-9]|
|1.0  |1.0       |[6.268752659147127E-9,0.9999999937312474]|
|0.0  |0.0       |[0.9999999942287736,5.771226385675732E-9]|
|1.0  |1.0       |[5.77122624986663E-9,0.9999999942287737] |
|0.0  |0.0       |[0.9999999937312471,6.268752850147052E-9]|
|1.0  |1.0       |[5.313186567862795E-9,0.9999999946868134]|
+-----+----------+-----------------------------------------+



In [None]:
pipe = Pipeline(stages=[assembler, lr])

In [None]:
ppmodel = pipe.fit(df)

In [None]:
predict = ppmodel.transform(df)

In [None]:
predict.show(truncate=False)

+-----+--------+--------+--------+-------------+----------------------------------------+-----------------------------------------+----------+
|Label|Feature1|Feature2|Feature3|features     |rawPrediction                           |probability                              |prediction|
+-----+--------+--------+--------+-------------+----------------------------------------+-----------------------------------------+----------+
|0.0  |1.0     |1.0     |0.0     |[1.0,1.0,0.0]|[19.053074058278817,-19.053074058278817]|[0.9999999946868134,5.313186557387439E-9]|0.0       |
|1.0  |2.0     |1.0     |1.0     |[2.0,1.0,1.0]|[-18.887688433758672,18.887688433758672]|[6.268752659147127E-9,0.9999999937312474]|1.0       |
|0.0  |3.0     |2.0     |0.0     |[3.0,2.0,0.0]|[18.97038124044345,-18.97038124044345]  |[0.9999999942287736,5.771226385675732E-9]|0.0       |
|1.0  |4.0     |2.0     |1.0     |[4.0,2.0,1.0]|[-18.970381251594038,18.970381251594038]|[5.77122624986663E-9,0.9999999942287737] |1.0       |

In [None]:
pipe.write().overwrite().save("pipe_model")

In [None]:
 ppl= Pipeline.load('pipe_model')

In [None]:
 ppl_model = ppl.fit(df)

In [None]:
pred = ppl_model.transform(df)

In [None]:
pred.show(truncate=False)

+-----+--------+--------+--------+-------------+----------------------------------------+-----------------------------------------+----------+
|Label|Feature1|Feature2|Feature3|features     |rawPrediction                           |probability                              |prediction|
+-----+--------+--------+--------+-------------+----------------------------------------+-----------------------------------------+----------+
|0.0  |1.0     |1.0     |0.0     |[1.0,1.0,0.0]|[19.053074058278817,-19.053074058278817]|[0.9999999946868134,5.313186557387439E-9]|0.0       |
|1.0  |2.0     |1.0     |1.0     |[2.0,1.0,1.0]|[-18.887688433758672,18.887688433758672]|[6.268752659147127E-9,0.9999999937312474]|1.0       |
|0.0  |3.0     |2.0     |0.0     |[3.0,2.0,0.0]|[18.97038124044345,-18.97038124044345]  |[0.9999999942287736,5.771226385675732E-9]|0.0       |
|1.0  |4.0     |2.0     |1.0     |[4.0,2.0,1.0]|[-18.970381251594038,18.970381251594038]|[5.77122624986663E-9,0.9999999942287737] |1.0       |

In [None]:
# files.upload()

In [None]:
data = sp.read.csv('data.csv', header=True, inferSchema=True)

In [None]:
when(column != value, column).otherwise(lit(None))

In [None]:
def replace(column, value):
    return when(column != value, column).otherwise(lit(None))

In [None]:
data = data.withColumn('Market Category', replace(col('Market Category'), 'N/A'))

In [None]:
data.select([c for c in data.columns]).show()


+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
|Make|     Model|Year|    Engine Fuel Type|Engine HP|Engine Cylinders|Transmission Type|    Driven_Wheels|Number of Doors|     Market Category|Vehicle Size|Vehicle Style|highway MPG|city mpg|Popularity| MSRP|
+----+----------+----+--------------------+---------+----------------+-----------------+-----------------+---------------+--------------------+------------+-------------+-----------+--------+----------+-----+
| BMW|1 Series M|2011|premium unleaded ...|      335|               6|           MANUAL| rear wheel drive|              2|Factory Tuner,Lux...|     Compact|        Coupe|         26|      19|      3916|46135|
| BMW|  1 Series|2011|premium unleaded ...|      300|               6|           MANUAL| rear wheel drive|              2|  Luxury,Performance|     Compact|  Conver

In [None]:
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

In [None]:
data = data.drop(“Market Category”)

In [None]:
data = data.na.drop()

In [None]:
print((data.count(), len(data.columns)))

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Sample Data
data = [
    (1.0, 2.0, 1),
    (2.0, 3.0, 0),
    (3.0, 4.0, 1),
    (4.0, 5.0, 0),
    (5.0, 6.0, 1)
]
columns = ["Feature1", "Feature2", "Label"]
df = spark.createDataFrame(data, columns)

# Assemble Features
assembler = VectorAssembler(inputCols=["Feature1", "Feature2"], outputCol="features")
assembled_data = assembler.transform(df)

# Split Data
train_data, test_data = assembled_data.randomSplit([0.7, 0.3], seed=42)

# Logistic Regression Model
lr = LogisticRegression(featuresCol='features', labelCol='Label')
lr_model = lr.fit(train_data)

# Predictions
predictions = lr_model.transform(test_data)
predictions.show()

# Evaluation
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")


evaluator_pr = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Label", metricName="areaUnderPR")
pr = evaluator_pr.evaluate(predictions)
print(f"Area Under Precision-Recall: {pr}")



+--------+--------+-----+---------+-------------+-----------+----------+
|Feature1|Feature2|Label| features|rawPrediction|probability|prediction|
+--------+--------+-----+---------+-------------+-----------+----------+
|     3.0|     4.0|    1|[3.0,4.0]|   [-0.0,0.0]|  [0.5,0.5]|       0.0|
+--------+--------+-----+---------+-------------+-----------+----------+

Area Under ROC: 1.0
Area Under Precision-Recall: 1.0


In [None]:
import torch
import torch.nn as nn
from torch.optim import Adam

In [None]:
import os

In [None]:
os.environ["MASTER_ADDR"] = "localhost"  # Set to your machine's IP if running on a cluster
os.environ["MASTER_PORT"] = "12345"

In [None]:
torch.distributed.init_process_group("gloo", rank=1, world_size=2)
torch.cuda.set_device(1)


DistNetworkError: The client socket has timed out after 1800s while trying to connect to (localhost, 12345).

In [None]:

class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc1 = nn.Linear(784, 256)
        self.fc2 = nn.Linear(256, 10)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = SimpleModel()
optimizer = Adam(model.parameters(), lr=0.001)

distributor = TorchDistributor(model=model, optimizer=optimizer)

distributor.train(sp.sparkContext, num_epochs=10)


TypeError: TorchDistributor.__init__() got an unexpected keyword argument 'model'