In [1]:
!pip list

Package                   Version
------------------------- -----------
anyio                     4.10.0
argon2-cffi               21.3.0
argon2-cffi-bindings      25.1.0
asttokens                 3.0.0
async-lru                 2.0.5
attrs                     25.4.0
babel                     2.16.0
backcall                  0.2.0
beautifulsoup4            4.13.5
bleach                    6.2.0
Bottleneck                1.4.2
brotlicffi                1.0.9.2
certifi                   2025.10.5
cffi                      2.0.0
charset-normalizer        3.4.4
colorama                  0.4.6
comm                      0.2.1
debugpy                   1.8.16
decorator                 5.2.1
defusedxml                0.7.1
dnspython                 2.7.0
exceptiongroup            1.3.0
executing                 2.2.1
fastjsonschema            2.20.0
findspark                 2.0.1
h11                       0.16.0
httpcore                  1.0.9
httpx                     0.28.1
idna            

In [3]:
import os
os.environ['PYSPRK_PYTHON'] = "python"
os.environ['PYSPRK_DRIVER_PYTHON_'] = "python"

In [None]:
from pyspark.sql.utils import SparkSession
from pyspark.sql.functions import col

In [4]:
spark = SparkSession.builder \
    .appName('PySpark Example')\
    .master("local[*]")\
    .getOrCreate()

df = spark.read.csv("data/Subjects.csv", header=True, inferSchema=True)
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df.show()

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| bbb| 45| 45|  56|     98|  244|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| eee| 45| 65|  78|     98|  286|
|    1| fff| 78| 76|  98|     89|  341|
|    2| ggg| 87| 67|  65|     56|  275|
|    2| hhh| 89| 98|  78|     78|  343|
|    2| iii|100| 78|  56|     65|  299|
|    2| jjj| 99| 89|  87|     87|  362|
|    2| kkk| 98| 45|  56|     54|  253|
|    2| lll| 65| 89|  87|     78|  319|
+-----+----+---+---+----+-------+-----+



In [10]:
# 컬럼 추가
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df.show()

# 평균 계산
df.groupBy().avg("total").show()

# 조건 필터링: 총점 300 이상 합격
df.filter(col("total")>=300).show()

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| bbb| 45| 45|  56|     98|  244|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| eee| 45| 65|  78|     98|  286|
|    1| fff| 78| 76|  98|     89|  341|
|    2| ggg| 87| 67|  65|     56|  275|
|    2| hhh| 89| 98|  78|     78|  343|
|    2| iii|100| 78|  56|     65|  299|
|    2| jjj| 99| 89|  87|     87|  362|
|    2| kkk| 98| 45|  56|     54|  253|
|    2| lll| 65| 89|  87|     78|  319|
+-----+----+---+---+----+-------+-----+

+-----------------+
|       avg(total)|
+-----------------+
|312.3333333333333|
+-----------------+

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| fff| 78| 7

In [12]:
# Temp View 생성
df.createOrReplaceTempView("students")

# SQL 쿼리
high_score = spark.sql("SELECT name, total FROM students WHERE total >= 300")
high_score.show()

+----+-----+
|name|total|
+----+-----+
| aaa|  342|
| ccc|  338|
| ddd|  346|
| fff|  341|
| hhh|  343|
| jjj|  362|
| lll|  319|
+----+-----+



In [13]:
# sparksession 종료
spark.stop()

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression


In [18]:
# sparksession : 메인 진입접 master("local[*]") : cpu 코어 전체 사용
spark = SparkSession.builder\
    .appName("PySpark_ML_Example")\
    .master("Local[*]")\
    .getOrCreate()


In [19]:
# csv 파일로드(첫번째 행을 컬럼명, 데이터 타입 자동 추론)
df = spark.read.csv('data/Subjects.csv', header=True, inferSchema=True)

df.show()

+-----+----+---+---+----+-------+
|class|name|kor|eng|math|science|
+-----+----+---+---+----+-------+
|    1| aaa| 67| 87|  90|     98|
|    1| bbb| 45| 45|  56|     98|
|    1| ccc| 95| 59|  96|     88|
|    1| ddd| 65| 94|  89|     98|
|    1| eee| 45| 65|  78|     98|
|    1| fff| 78| 76|  98|     89|
|    2| ggg| 87| 67|  65|     56|
|    2| hhh| 89| 98|  78|     78|
|    2| iii|100| 78|  56|     65|
|    2| jjj| 99| 89|  87|     87|
|    2| kkk| 98| 45|  56|     54|
|    2| lll| 65| 89|  87|     78|
+-----+----+---+---+----+-------+



In [28]:
# 결측값은 0으로(데이터 전처리), 총점및 합격여부 컬럼(파생변수)
df = df.fillna({"kor":0, "eng":0, "math":0, "science":0})
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df = df.withColumn("pass", when(col("total") >= 300, 1).otherwise(0))

df.show()

+-----+----+---+---+----+-------+-----+----+
|class|name|kor|eng|math|science|total|pass|
+-----+----+---+---+----+-------+-----+----+
|    1| aaa| 67| 87|  90|     98|  342|   1|
|    1| bbb| 45| 45|  56|     98|  244|   0|
|    1| ccc| 95| 59|  96|     88|  338|   1|
|    1| ddd| 65| 94|  89|     98|  346|   1|
|    1| eee| 45| 65|  78|     98|  286|   0|
|    1| fff| 78| 76|  98|     89|  341|   1|
|    2| ggg| 87| 67|  65|     56|  275|   0|
|    2| hhh| 89| 98|  78|     78|  343|   1|
|    2| iii|100| 78|  56|     65|  299|   0|
|    2| jjj| 99| 89|  87|     87|  362|   1|
|    2| kkk| 98| 45|  56|     54|  253|   0|
|    2| lll| 65| 89|  87|     78|  319|   1|
+-----+----+---+---+----+-------+-----+----+



In [32]:
# 회귀 모델/ 입력 피쳐 - 전체 / 타겟 - total
assembler = VectorAssembler(
    inputCols=["kor", "eng", "math", "science"],
    outputCol="features"
)

In [35]:
train_df = assembler.transform(df)
train_df.show()

+-----+----+---+---+----+-------+-----+----+--------------------+
|class|name|kor|eng|math|science|total|pass|            features|
+-----+----+---+---+----+-------+-----+----+--------------------+
|    1| aaa| 67| 87|  90|     98|  342|   1|[67.0,87.0,90.0,9...|
|    1| bbb| 45| 45|  56|     98|  244|   0|[45.0,45.0,56.0,9...|
|    1| ccc| 95| 59|  96|     88|  338|   1|[95.0,59.0,96.0,8...|
|    1| ddd| 65| 94|  89|     98|  346|   1|[65.0,94.0,89.0,9...|
|    1| eee| 45| 65|  78|     98|  286|   0|[45.0,65.0,78.0,9...|
|    1| fff| 78| 76|  98|     89|  341|   1|[78.0,76.0,98.0,8...|
|    2| ggg| 87| 67|  65|     56|  275|   0|[87.0,67.0,65.0,5...|
|    2| hhh| 89| 98|  78|     78|  343|   1|[89.0,98.0,78.0,7...|
|    2| iii|100| 78|  56|     65|  299|   0|[100.0,78.0,56.0,...|
|    2| jjj| 99| 89|  87|     87|  362|   1|[99.0,89.0,87.0,8...|
|    2| kkk| 98| 45|  56|     54|  253|   0|[98.0,45.0,56.0,5...|
|    2| lll| 65| 89|  87|     78|  319|   1|[65.0,89.0,87.0,7...|
+-----+---

In [36]:
train_df = assembler.transform(df).select("features", "total")
train_df.show()

+--------------------+-----+
|            features|total|
+--------------------+-----+
|[67.0,87.0,90.0,9...|  342|
|[45.0,45.0,56.0,9...|  244|
|[95.0,59.0,96.0,8...|  338|
|[65.0,94.0,89.0,9...|  346|
|[45.0,65.0,78.0,9...|  286|
|[78.0,76.0,98.0,8...|  341|
|[87.0,67.0,65.0,5...|  275|
|[89.0,98.0,78.0,7...|  343|
|[100.0,78.0,56.0,...|  299|
|[99.0,89.0,87.0,8...|  362|
|[98.0,45.0,56.0,5...|  253|
|[65.0,89.0,87.0,7...|  319|
+--------------------+-----+



In [44]:
# featureCol:독립변수 벡터 labelCol: 종속변수 .fit: 회귀계수(weight)와 절편(intercept) 학습
lr = LinearRegression(featuresCol="features", labelCol="total")
lr_model = lr.fit(train_df)

# 예측 수행
lr_predictions = lr_model.transform(train_df)
# lr_predictions.select("features", "total", "prediction").show(5)
lr_predictions.show()

+--------------------+-----+------------------+
|            features|total|        prediction|
+--------------------+-----+------------------+
|[67.0,87.0,90.0,9...|  342| 342.0000000000004|
|[45.0,45.0,56.0,9...|  244|243.99999999999918|
|[95.0,59.0,96.0,8...|  338|338.00000000000153|
|[65.0,94.0,89.0,9...|  346| 346.0000000000002|
|[45.0,65.0,78.0,9...|  286|285.99999999999903|
|[78.0,76.0,98.0,8...|  341|341.00000000000045|
|[87.0,67.0,65.0,5...|  275|274.99999999999847|
|[89.0,98.0,78.0,7...|  343| 343.0000000000002|
|[100.0,78.0,56.0,...|  299|298.99999999999994|
|[99.0,89.0,87.0,8...|  362|362.00000000000153|
|[98.0,45.0,56.0,5...|  253|252.99999999999912|
|[65.0,89.0,87.0,7...|  319| 318.9999999999987|
+--------------------+-----+------------------+



In [46]:
# 분류 모델(logistic regression)/입력:과목피쳐/타겟:합격여부/목적:합불합 분류

assembler2 = VectorAssembler(
    inputCols=["kor", "eng", "math", "science"],
    outputCol="features"
)

train_df2 = assembler2.transform(df).select("features", "pass")

In [59]:
logr = LogisticRegression(featuresCol = "features", labelCol = "pass")
logr_model = logr.fit(train_df2)

logr_predictions = logr_model.transform(train_df2)
logr_predictions.show()
logr_predictions.select("features", "pass", "prediction", "probability").show(truncate=False)
print(col("pass"))

+--------------------+----+--------------------+--------------------+----------+
|            features|pass|       rawPrediction|         probability|prediction|
+--------------------+----+--------------------+--------------------+----------+
|[67.0,87.0,90.0,9...|   1|[-19.686724629161...|[2.81944823643108...|       1.0|
|[45.0,45.0,56.0,9...|   0|[59.4855269309313...|           [1.0,0.0]|       0.0|
|[95.0,59.0,96.0,8...|   1|[-18.201306704974...|[1.24529696368858...|       1.0|
|[65.0,94.0,89.0,9...|   1|[-22.409196735736...|[1.85271882249817...|       1.0|
|[45.0,65.0,78.0,9...|   0|[17.5872602642891...|[0.99999997698823...|       0.0|
|[78.0,76.0,98.0,8...|   1|[-26.642164741617...|[2.68815846669923...|       1.0|
|[87.0,67.0,65.0,5...|   0|[17.9887252920324...|[0.99999998459733...|       0.0|
|[89.0,98.0,78.0,7...|   1|[-19.278335486291...|[4.24155669289329...|       1.0|
|[100.0,78.0,56.0,...|   0|[18.5685481242207...|[0.99999999137454...|       0.0|
|[99.0,89.0,87.0,8...|   1|[

In [60]:
spark.stop()

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# spark = SparkSession.builder \
#     .appName("MongoDB_Spark_Test")\
#     .master("local[*]")\
#     .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.2.0")\
#     .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/testdb.students")\
#     .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/testdb.students_result")\
#     .getOrCreate()

# my_spark = SparkSession \
#     .builder \
#     .appName("myApp") \
#     .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
#     .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/testdb.coll") \
#     .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/testdb.coll") \
#     .getOrCreate()

spark = SparkSession.builder \
    .appName("MongoSparkExample") \
    .config("spark.jars", "C:/spark/bin/mongo-spark-connector_2.13-10.5.0.jar") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/testdb.students") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/testdb.students") \
    .getOrCreate()

In [36]:
# df = spark.read\
#     .format("mongo")\
#     # .option("uri", "mongodb://localhost:27017/testdb.students")\
#     .load()

df = spark.read.format("mongodb").load()

print("MongoDB 데이터 로드 완료")
df.show()

Py4JJavaError: An error occurred while calling o149.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongodb. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at jdk.internal.reflect.GeneratedMethodAccessor29.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 14 more


In [69]:
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df = df.withColumn("pass", when(col("total") >= 300, 1).otherwise(0))

In [103]:
df.write\
    .format("mongodb")\
    .mode("overwrite")\
    .save()

Py4JJavaError: An error occurred while calling o1978.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongodb. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more
