In [1]:
sc.addPyFile("/Users/xujinwen/spark/spark-3.3.1-bin-hadoop2/jars/graphframes-0.8.2-spark3.0-s_2.12.jar")

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from graphframes import *

MongoDB Spark Connect

Step1: Connect to MongoDB instances\
Step2: Start Spark Shell from the command line
    
    pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.sparkify1200" \
              --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.sparkify1200" \
              --packages org.mongodb.spark:mongo-spark-connector_2.12:10.1.1

Step3: Create a SparkSession Object, specified the spark.mongodb.input.uri and spark.mongodb.output.uri configuration options. Use SparkSession.builder and specify different configuration options to create your own SparkSession object.

- The spark.mongodb.input.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) from which to read data, and the read preference.
- The spark.mongodb.output.uri specifies the MongoDB server address (127.0.0.1), the database to connect (test), and the collection (myCollection) to which to write data. Connects to port 27017 by default.
- The packages option specifies the Spark Connector's Maven coordinates, in the format groupId:artifactId:version.

-> Uncomment below code to use MongoDB

In [None]:
# from pyspark.sql import SparkSession
# spark = SparkSession \
#     .builder \
#     .appName("myApp") \
#     .config("spark.mongodb.input.uri", "mongodb://@127.0.0.1:27017/test.sparkify1200") \
#     .config("spark.mongodb.output.uri", "mongodb://@127.0.0.1:27017/test.sparkify1200") \
#     .getOrCreate()

# #load data
# data = spark.read.format("mongodb").option("header","true").option('escape','"').load()
# original_count = data.count()
# data.take(1)

In [3]:
# Create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify_Network") \
    .getOrCreate()
# Set time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

23/05/29 15:08:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
# Load dataset
path = "data/mini_sparkify_event_data.json"
data = spark.read.json(path)

                                                                                

In [5]:
from pyspark.sql.functions import size, collect_set
df = data.select("artist", "userId") \
                    .groupBy("artist") \
                    .agg(collect_set(col('userId')).alias('user_list')) \
                    .select('*',size('user_list').alias('size')) \
                    .filter("size>=150") \
                    .filter("artist != 'null'") \
                    .orderBy("size", ascending = False)
df.count()

                                                                                

37

In [6]:
df.show()

+--------------------+--------------------+----+
|              artist|           user_list|size|
+--------------------+--------------------+----+
|       Kings Of Leon|[120, 66, 300013,...| 199|
|       Dwight Yoakam|[120, 66, 300013,...| 189|
|            Coldplay|[120, 66, 141, 15...| 189|
|Florence + The Ma...|[120, 66, 300013,...| 187|
|            BjÃÂ¶rk|[120, 66, 300013,...| 179|
|      The Black Keys|[120, 66, 300013,...| 179|
|       Justin Bieber|[120, 66, 141, 15...| 177|
|        Taylor Swift|[120, 66, 141, 15...| 173|
|        Jack Johnson|[120, 66, 300013,...| 173|
|     Alliance Ethnik|[66, 120, 300013,...| 172|
|            Harmonia|[120, 66, 15, 154...| 172|
|       Guns N' Roses|[120, 66, 300013,...| 170|
|               Train|[120, 66, 141, 15...| 169|
|              Eminem|[120, 66, 141, 15...| 169|
|         The Killers|[120, 66, 141, 15...| 168|
|         OneRepublic|[66, 120, 300013,...| 168|
|           Metallica|[66, 120, 300013,...| 168|
|           Radiohea

In [7]:
song_vertices = df.select("artist", "size").withColumnRenamed("artist","id")
song_vertices.show()

+--------------------+----+
|                  id|size|
+--------------------+----+
|       Kings Of Leon| 199|
|       Dwight Yoakam| 189|
|            Coldplay| 189|
|Florence + The Ma...| 187|
|            BjÃÂ¶rk| 179|
|      The Black Keys| 179|
|       Justin Bieber| 177|
|        Taylor Swift| 173|
|        Jack Johnson| 173|
|     Alliance Ethnik| 172|
|            Harmonia| 172|
|       Guns N' Roses| 170|
|               Train| 169|
|              Eminem| 169|
|         The Killers| 168|
|         OneRepublic| 168|
|           Metallica| 168|
|           Radiohead| 167|
|                Muse| 166|
|          John Mayer| 166|
+--------------------+----+
only showing top 20 rows



In [8]:
# 获取艺术家列表
artists = df.select("artist").distinct().rdd.flatMap(lambda x: x).collect()

# 定义结果数据框架的模式
result_schema = StructType([
    StructField("artist1", StringType(), True),
    StructField("artist2", StringType(), True),
    StructField("common_listeners", IntegerType(), True)
])

# 创建空的结果数据框架
result = spark.createDataFrame([], result_schema)

# 遍历艺术家列表并找到共同的听众列表
for i in range(len(artists)):
    for j in range(i + 1, len(artists)):
        artist1 = artists[i]
        artist2 = artists[j]
        listeners1 = set(df.filter(col("artist") == artist1).select("user_list").first()[0])
        listeners2 = set(df.filter(col("artist") == artist2).select("user_list").first()[0])
        common_listeners = len(listeners1.intersection(listeners2))
        if common_listeners > 0:
                result = result.union(spark.createDataFrame([(artist1, artist2, common_listeners)], ["artist1", "artist2", "common_listeners"]))


# 显示结果
result.show()

23/05/29 15:11:55 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:11:55 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:11:55 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:11:55 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:11:57 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
+-------------+--------------------+----------------+
|      artist1|             artist2|common_listeners|
+-------------+--------------------+----------------+
|Kings Of Leon|            Coldplay|             181|
|Kings Of Leon|       Dwight Yoakam|             181|
|Kings Of Leon|Florence + The Ma...|             179|
|Kings Of Leon|      The Black Keys|             176|
|Kings Of Leon|            BjÃÂ¶rk|             172|
|Kings Of Leon|       Justin Bieber|             173|
|Kings Of Leon|        Taylor Swift|             170|
|Kings Of Leon|        Jack Jo



                                                                                

In [9]:
song_edges = result.toDF("src","dst","weight")
song_edges.show()

23/05/29 15:13:45 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:13:45 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:13:46 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:13:46 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
23/05/29 15:13:47 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
+-------------+--------------------+------+
|          src|                 dst|weight|
+-------------+--------------------+------+
|Kings Of Leon|            Coldplay|   181|
|Kings Of Leon|       Dwight Yoakam|   181|
|Kings Of Leon|Florence + The Ma...|   179|
|Kings Of Leon|      The Black Keys|   176|
|Kings Of Leon|            BjÃÂ¶rk|   172|
|Kings Of Leon|       Justin Bieber|   173|
|Kings Of Leon|        Taylor Swift|   170|
|Kings Of Leon|        Jack Johnson|   167|
|Kings Of Leon|            Harmonia|   166|
|Kings Of Leon|     Alliance Ethnik|   167

In [10]:
edges = song_edges.toPandas()

# 导出为 CSV 文件
edges.to_csv("edges.csv", index=False)

23/05/29 15:14:10 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB


                                                                                

In [11]:
song_graph = GraphFrame(song_vertices, song_edges)



In [12]:
song_graph.vertices.count()



                                                                                

37

In [13]:
song_graph.outDegrees.show()



23/05/29 17:43:43 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB




23/05/29 17:44:31 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
+--------------------+---------+
|                  id|outDegree|
+--------------------+---------+
|       Kings Of Leon|       36|
|            Coldplay|       35|
|       Dwight Yoakam|       34|
|Florence + The Ma...|       33|
|      The Black Keys|       32|
|            BjÃÂ¶rk|       31|
|       Justin Bieber|       30|
|        Taylor Swift|       29|
|        Jack Johnson|       28|
|            Harmonia|       27|
|     Alliance Ethnik|       26|
|       Guns N' Roses|       25|
|              Eminem|       24|
|               Train|       23|
|         The Killers|       22|
|           Metallica|       21|
|         OneRepublic|       20|
|           Radiohead|       19|
|          John Mayer|       18|
|         Evanescence|       17|
+--------------------+---------+
only showing top 20 rows




                                                                                