In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [5]:
spark = SparkSession.builder \
    .appName("PySpark Example") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.csv("data/Subjects.csv", header=True, inferSchema=True)
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df.show()

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| bbb| 45| 45|  56|     98|  244|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| eee| 45| 65|  78|     98|  286|
|    1| fff| 78| 76|  98|     89|  341|
|    2| ggg| 87| 67|  65|     56|  275|
|    2| hhh| 89| 98|  78|     78|  343|
|    2| iii|100| 78|  56|     65|  299|
|    2| jjj| 99| 89|  87|     87|  362|
|    2| kkk| 98| 45|  56|     54|  253|
|    2| lll| 65| 89|  87|     78|  319|
+-----+----+---+---+----+-------+-----+



In [6]:
# 컬럼추가 : 총점
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df.show()

# 평균점수 계산
df.groupBy().avg("total").show()

# 조건 필터링: 총점 300 이상 합격
df.filter(col("total") >= 300).show()
df["name","total"].filter(col("total") >= 300).show()

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| bbb| 45| 45|  56|     98|  244|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| eee| 45| 65|  78|     98|  286|
|    1| fff| 78| 76|  98|     89|  341|
|    2| ggg| 87| 67|  65|     56|  275|
|    2| hhh| 89| 98|  78|     78|  343|
|    2| iii|100| 78|  56|     65|  299|
|    2| jjj| 99| 89|  87|     87|  362|
|    2| kkk| 98| 45|  56|     54|  253|
|    2| lll| 65| 89|  87|     78|  319|
+-----+----+---+---+----+-------+-----+

+-----------------+
|       avg(total)|
+-----------------+
|312.3333333333333|
+-----------------+

+-----+----+---+---+----+-------+-----+
|class|name|kor|eng|math|science|total|
+-----+----+---+---+----+-------+-----+
|    1| aaa| 67| 87|  90|     98|  342|
|    1| ccc| 95| 59|  96|     88|  338|
|    1| ddd| 65| 94|  89|     98|  346|
|    1| fff| 78| 7

In [7]:
spark.stop()
print("SparkSession 종료")

SparkSession 종료


In [8]:
# PySpark 데이터 분석 + 머신러닝(MLlib) 실습 예제

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression



In [9]:
## SparkSession 생성
# - SparkSession은 PySpark의 메인 진입점
# - master("Local[*]") --> CPU 코어 전체 사용

spark = SparkSession.builder \
    .appName("PySpark_ML_Example") \
    .master("local[*]") \
    .getOrCreate()

print("SparkSession 생성 완료")

SparkSession 생성 완료


In [10]:
df = spark.read.csv("data/Subjects.csv", header=True, inferSchema=True)
# - header=True : 첫 번째 행을 컬럼명으로 사용
# - inferSchema=True : 데이터 타입 자동 추론

print("CSV 파일 로드 완료")
df.show()


CSV 파일 로드 완료
+-----+----+---+---+----+-------+
|class|name|kor|eng|math|science|
+-----+----+---+---+----+-------+
|    1| aaa| 67| 87|  90|     98|
|    1| bbb| 45| 45|  56|     98|
|    1| ccc| 95| 59|  96|     88|
|    1| ddd| 65| 94|  89|     98|
|    1| eee| 45| 65|  78|     98|
|    1| fff| 78| 76|  98|     89|
|    2| ggg| 87| 67|  65|     56|
|    2| hhh| 89| 98|  78|     78|
|    2| iii|100| 78|  56|     65|
|    2| jjj| 99| 89|  87|     87|
|    2| kkk| 98| 45|  56|     54|
|    2| lll| 65| 89|  87|     78|
+-----+----+---+---+----+-------+



In [11]:
# 데이터 전처리 및 파생변수 생성
# - 결측값은 0으로 대체
# 총점 및 합격여부 컬럼 생성

df = df.fillna({"kor": 0, "eng": 0, "math": 0, "science": 0})
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))

# 1. 평균 총점 계산 및 파이썬 변수에 값 추출
avg_df = df.agg(avg("total").alias("avg_total"))
avg_score = avg_df.collect()[0]["avg_total"]

# (선택 사항) 평균값 확인
df.groupby().avg("total").show()
print(f"합격 기준 (평균 총점): {avg_score:.2f}")


# 2. 평균 이상 합격 여부 컬럼 생성
df = df.withColumn("pass", when(col("total") >= avg_score, 1).otherwise(0))
print("데이터 처리 완료")
df.show()

+-----------------+
|       avg(total)|
+-----------------+
|312.3333333333333|
+-----------------+

합격 기준 (평균 총점): 312.33
데이터 처리 완료
+-----+----+---+---+----+-------+-----+----+
|class|name|kor|eng|math|science|total|pass|
+-----+----+---+---+----+-------+-----+----+
|    1| aaa| 67| 87|  90|     98|  342|   1|
|    1| bbb| 45| 45|  56|     98|  244|   0|
|    1| ccc| 95| 59|  96|     88|  338|   1|
|    1| ddd| 65| 94|  89|     98|  346|   1|
|    1| eee| 45| 65|  78|     98|  286|   0|
|    1| fff| 78| 76|  98|     89|  341|   1|
|    2| ggg| 87| 67|  65|     56|  275|   0|
|    2| hhh| 89| 98|  78|     78|  343|   1|
|    2| iii|100| 78|  56|     65|  299|   0|
|    2| jjj| 99| 89|  87|     87|  362|   1|
|    2| kkk| 98| 45|  56|     54|  253|   0|
|    2| lll| 65| 89|  87|     78|  319|   1|
+-----+----+---+---+----+-------+-----+----+



In [12]:
# 회귀모델
# 입력 피처: kor,eng,math,science
# 타겟 : total
# 목적: 총점 예측 회귀모델

assembler = VectorAssembler(
    inputCols = ["kor","eng","math","science"],
    outputCol="features"
)
'''
Spark ML은 feature를 벡터형태로 받아야 학습 가능
여러 컬럼(f1,f2..)도 한 컬럼(feature)에 합침
'''

train_df = assembler.transform(df).select("features", "total") # x, y set

'''
featureCol : 독립 변수 벡터 컬럼
labelCol : 예측하려는 종속 변수
.fit(df) -> 회귀계수(weight)와 절편(intercept) 학습
'''
# 회귀 모델 생성 및 학습
lr = LinearRegression(featuresCol="features", labelCol="total")
lr_model = lr.fit(train_df)

# 예측 수행
lr_predictions = lr_model.transform(train_df)
print("Linear Regression 모델 학습 완료")

lr_predictions.select("features", "total", "prediction").show(5)


Linear Regression 모델 학습 완료
+--------------------+-----+------------------+
|            features|total|        prediction|
+--------------------+-----+------------------+
|[67.0,87.0,90.0,9...|  342| 342.0000000000004|
|[45.0,45.0,56.0,9...|  244|243.99999999999918|
|[95.0,59.0,96.0,8...|  338|338.00000000000153|
|[65.0,94.0,89.0,9...|  346| 346.0000000000002|
|[45.0,65.0,78.0,9...|  286|285.99999999999903|
+--------------------+-----+------------------+
only showing top 5 rows



In [13]:
# Logistic Regression 분류 모델

assembler2 = VectorAssembler(
    inputCols = ["kor","eng","math","science"],
    outputCol="features"
)
'''
Spark ML은 feature를 벡터형태로 받아야 학습 가능
여러 컬럼(f1,f2..)도 한 컬럼(feature)에 합침
'''

train_df2 = assembler2.transform(df).select("features", "pass")

lgr = LogisticRegression(featuresCol="features", labelCol="pass")
lgr_model = lgr.fit(train_df2)

# 예측 수행
lgr_predictions = lgr_model.transform(train_df2)
print("Logistic Regression 모델 학습 완료")

lgr_predictions.select("features", "pass", "prediction", "probability").show(5)


Logistic Regression 모델 학습 완료
+--------------------+----+----------+--------------------+
|            features|pass|prediction|         probability|
+--------------------+----+----------+--------------------+
|[67.0,87.0,90.0,9...|   1|       1.0|[2.81944823643108...|
|[45.0,45.0,56.0,9...|   0|       0.0|           [1.0,0.0]|
|[95.0,59.0,96.0,8...|   1|       1.0|[1.24529696368858...|
|[65.0,94.0,89.0,9...|   1|       1.0|[1.85271882249817...|
|[45.0,65.0,78.0,9...|   0|       0.0|[0.99999997698823...|
+--------------------+----+----------+--------------------+
only showing top 5 rows



In [14]:
# SparkSession 종료

spark.stop()
print("Sparksession stop")


Sparksession stop


In [1]:
# MongoDB 연동

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

spark = SparkSession.builder \
    .appName("MongoDB_Spark_Test") \
    .master("local[*]") \
    .config("spark.jars", "C:/spark/bin/mongo-spark-connector_2.12-10.2.0.jar") \
    .config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1:27017/testdb.students") \
    .config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/testdb.students_result") \
    .getOrCreate()

print("SparkSession 생성 완료")


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:1139)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:1125)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:489)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$14(Executor.scala:1163)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$14$adapted(Executor.scala:1155)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
	at scala.collection.immutable.Map$Map1.foreach(Map.scala:193)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
	at org.apache.spark.executor.Executor.updateDependencies(Executor.scala:1155)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:330)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:235)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:604)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more


In [None]:
df = spark.read \
    .format("mongodb") \
    .option("uri", "mongodb://localhost:27017/testdb.students") \
    .load()

print(" MongoDB 데이터 로드 완료")
df.show()

In [None]:
df = df.withColumn("total", col("kor") + col("eng") + col("math") + col("science"))
df = df.withColumn("pass", when(col("total") >= 300, 1).otherwise(0))
df.show()

In [None]:
df.write \
    .format("mongodb") \
    .mode("overwrite") \
    .save()
print(" MongoDB 저장 완료")

In [2]:
spark.stop()
print(" SparkSession 종료")

NameError: name 'spark' is not defined

In [3]:
import pandas as pd
import numpy as np

np.random.seed(42)

dates = pd.date_range('2024-01-01','2024-12-31')

regions = ['서울','부산','대구','광주','인천']
products = ['A','B','C','D']

data = {
    'date':np.random.choice(dates, 1000),
    'region': np.random.choice(regions, 1000),
    'product': np.random.choice(products, 1000),
    'sales': np.random.randint(100, 1000, 1000),
    'profit': np.random.randint(10, 200, 1000)
}

df = pd.DataFrame(data)

df = df.sort_values('date').reset_index(drop=True)

df.to_csv('sales_data.csv', index=False, encoding='utf-8-sig')

print("'sales_data.csv' 파일이 생성되었습니다.")
print(df.head())

'sales_data.csv' 파일이 생성되었습니다.
        date region product  sales  profit
0 2024-01-01     광주       B    982     173
1 2024-01-01     광주       C    134     124
2 2024-01-01     부산       D    148     123
3 2024-01-01     광주       A    745      74
4 2024-01-01     서울       B    389     147
