# 初始化

In [1]:
# 初始化，请填写appname参数，其中包含你的用户名和本次使用数据集
%run spark_openalex_init.py --appname=MarioZZJ_OpenAlex

可以在新标签页访问上面的 Spark UI 链接，查看任务执行进度，也可以去 Spark UI 总页面，查看多个 app 的并行运行情况。

# 查看 schema

此时我们获取了 27 个 `pyspark.sql.dataframe.DataFrame` 对象，分别对应 27 张表，可以使用 `showSchema()` 方法查看字段。

27 个对象的变量名：`Authors`, `AuthorsIds`, `AuthorsCountsByYear`, `Concepts`, `ConceptsAncestors`, `ConceptsCountsByYear`, `ConceptsIds`, `ConceptsRelatedConcepts`, `Institutions`, `InstitutionsAssociatedInstitutions`, `InstitutionsCountsByYear`, `InstitutionsGeo`, `InstitutionsIds`, `Works`, `WorksAuthorships`, `WorksAlternateHostVenues`, `WorksBiblio`, `WorksConcepts`, `WorksHostVenues`, `WorksIds`, `WorksMesh`, `WorksOpenAccess`, `WorksRelatedWorks`, `WorksReferencedWorks`, `Venues`, `VenuesCountsByYear`, `VenuesIds`。

各表详细的 Schema 可以查看 [OpenAlex 文档](https://docs.openalex.org/download-snapshot/upload-to-your-database/load-to-a-relational-database/postgres-schema-diagram)。
![schema](https://2520693015-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FSj6S26Opvy3KVj3QQGMc%2Fuploads%2FS3eqe0lsHYmqxJTcPa9V%2Fopenalex-schema.png?alt=media&token=64f070a8-ca96-4639-96d7-06d8a6ea659d)

> 注：图中部分字段名打错了，如 [display_name_acronyms](https://gist.github.com/richard-orr/152d828356a7c47ed7e3e22d2253708d?permalink_comment_id=4366952#gistcomment-4366952)，建议以实际读取的 schema 为准。

In [2]:
Authors.printSchema()

root
 |-- id: long (nullable = true)
 |-- orcid: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- display_name_alternatives: string (nullable = true)
 |-- works_count: integer (nullable = true)
 |-- cited_by_count: integer (nullable = true)
 |-- last_known_institution: string (nullable = true)
 |-- works_api_url: string (nullable = true)
 |-- updated_date: date (nullable = true)
 |-- display_name_alternatives_array: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [3]:
Works

DataFrame[id: bigint, doi: string, title: string, display_name: string, publication_year: int, publication_date: string, type: string, cited_by_count: int, is_retracted: boolean, is_paratext: boolean, host_venue: string]

# 案例：模糊检索
类似于 SQL 中的 `WHERE` `LIKE`

In [4]:
# 查找前 10 篇标题包含'covid'的论文
works = Works.filter(
            Works.display_name.like('%covid%') # filter() 过滤中填入表达式，可以用`==`精确匹配，也可用 like() 模糊匹配
        ).limit(10).toPandas() # 建议探索时多用 limit，减少对资源的占用 
                    # toPandas() 转化为 dataframe，直接在 notebook 展示
works

Unnamed: 0,id,doi,title,display_name,publication_year,publication_date,type,cited_by_count,is_retracted,is_paratext,host_venue
0,4206974766,https://doi.org/10.14393/ufu.di.2022.24,A atuação da UFU frente a pandemia de covid-19...,A atuação da UFU frente a pandemia de covid-19...,2022,2022-01-24,dissertation,0,False,False,https://api.openalex.org/works?filter=cites:W4...
1,4210273141,https://doi.org/10.7476/9786557081587.0027,A experiência do Proqualis na produção e divul...,A experiência do Proqualis na produção e divul...,2022,2022-01-26,book-chapter,0,False,False,https://api.openalex.org/works?filter=cites:W4...
2,4210584131,https://doi.org/10.1016/j.shaw.2021.12.1275,Which jobs are lucky against the “biologic” an...,Which jobs are lucky against the “biologic” an...,2022,2022-01-01,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W4...
3,3216835453,https://doi.org/10.47573/aya.88580.2.48.12,O impacto da covid-19 no sistema de obras públ...,O impacto da covid-19 no sistema de obras públ...,2021,2021-11-30,book-chapter,0,False,False,https://api.openalex.org/works?filter=cites:W3...
4,3216821236,https://doi.org/10.1080/09647775.2021.2002509,Centering the museum: writings for the post-co...,Centering the museum: writings for the post-co...,2021,2021-11-29,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...
5,4200127697,https://doi.org/10.1136/bmj.n2989,Could it be covid? Update information to stop ...,Could it be covid? Update information to stop ...,2021,2021-12-03,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W4...
6,4200006623,https://doi.org/10.31080/asol.2021.03.0362,Post-covid Rhino Cerebral Mucor Mycosis: Neuro...,Post-covid Rhino Cerebral Mucor Mycosis: Neuro...,2021,2021-12-01,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W4...
7,4200077283,https://doi.org/10.1016/j.euroneuro.2021.10.199,P.0206 Mental health during covid-19 infection,P.0206 Mental health during covid-19 infection,2021,2021-12-01,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W4...
8,4200263580,https://doi.org/10.1016/j.euroneuro.2021.10.249,P.0259 Topic modeling analysis of publications...,P.0259 Topic modeling analysis of publications...,2021,2021-12-01,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W4...
9,4200419006,https://doi.org/10.1590/scielopreprints.3380,Como superar a estagnação da curva de cobertur...,Como superar a estagnação da curva de cobertur...,2021,2021-12-20,posted-content,0,False,False,https://api.openalex.org/works?filter=cites:W4...


# 案例：表连接
类似于 SQL 中的 `JOIN`

In [5]:
WorksReferencedWorks

DataFrame[work_id: bigint, referenced_work_id: bigint]

In [6]:
# 查找文献x的前10篇引文
selected_citations = Works.filter(Works.id==3213651567).join(
    WorksReferencedWorks, # 第一个参数：连接的表
    Works.id == WorksReferencedWorks.work_id, # 第二个参数：连接依据，where
    how = 'inner') # 第三个参数：连接方式，这里为 inner join
selected_citations.limit(10).toPandas() # 如果没有调用 toPandas()、show()等，变量仅保留计算图，并未实际开始计算

Unnamed: 0,id,doi,title,display_name,publication_year,publication_date,type,cited_by_count,is_retracted,is_paratext,host_venue,work_id,referenced_work_id
0,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,1493925022
1,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,1765504387
2,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,3039521750
3,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,3081416251
4,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,3094350895
5,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,3132890237
6,3213651567,https://doi.org/10.48102/pi.v29i3.414,Psicología basada en la evidencia en tiempos d...,Psicología basada en la evidencia en tiempos d...,2021,2021-11-09,journal-article,0,False,False,https://api.openalex.org/works?filter=cites:W3...,3213651567,3172256592


# 案例：聚合统计
类似于 SQL 中的 `GROUP BY`

In [7]:
# 统计文献x的关联mesh heading数
selected_meshes = WorksMesh.filter(WorksMesh.work_id == 2513796179)\
                    .groupBy(WorksMesh.work_id).agg(# agg() 聚合函数
                    F.countDistinct(WorksMesh.descriptor_ui)# countDistinct为去重计数，更多函数可查看 pyspark.sql.functions 的手册
                    .alias("Mesh_D_Count"))# alias 为新生成的列指定别名

selected_meshes.toPandas()

Unnamed: 0,work_id,Mesh_D_Count
0,2513796179,7


# 案例：列裁剪
类似于 SQL 中的 `SELECT`。对于研究需要，大多数表我们只需要部分字段。尽早进行列裁剪对于减少计算压力具有一定帮助。

In [8]:
# 取出 Works 表的前 10 条数据，只需要 id 和 doi 两个字段
works_sub = Works.select(
                Works.id,
                Works.doi
)
works_sub.limit(10).toPandas()

Unnamed: 0,id,doi
0,3143064787,https://doi.org/10.18800/psico.202001.010
1,3144438434,https://doi.org/10.20878/cshr.2019.25.12.020
2,3146640534,https://doi.org/10.17759/sps.2020110105
3,3015405040,https://doi.org/10.3726/b16812
4,3148351627,https://doi.org/10.1590/0034-7167-2018-0192
5,3149532180,https://doi.org/10.25178/nit.2019.1.2
6,2949504104,https://doi.org/10.1097/01.hjh.0000570716.9836...
7,3150456942,https://doi.org/10.31866/2616-745x.4.2019.177614
8,3150791969,https://doi.org/10.32431/kace.2020.23.1.005
9,3151025289,https://doi.org/10.35180/gse


# 案例：保存数据
有时我们需要将中间结果进行保存，如果数据量较小，可以对 toPandas() 的 dataframe 对象直接使用 pandas 库的方法保存，而当数据量较大时，建议使用 spark 的 `write()` 方法保存为多个文件，减小内存和运算压力。

In [9]:
# 取出 Works 表的前 1000 条数据，只需要 id 和 doi 两个字段，保存至指定文件夹
works_sub.limit(1000).write.csv("./data/id_doi",header=True,mode="overwrite") # header 为 True 时，每个子文件均包含表头（建议，方便后续读取）

此时在 `./data/id_doi` 文件夹下就会存储若干个文件，保存所有取出数据

# 案例：读取数据
对于上述保存的分片数据，同样可以使用 spark 的 `read()` 方法将其读取为 `pyspark.sql.dataframe.DataFrame` 对象，进行后续操作。 

In [10]:
works_sub = spark.read.csv("./data/id_doi", header=True, inferSchema=True)
print(type(works_sub))
works_sub.limit(10).toPandas()

<class 'pyspark.sql.dataframe.DataFrame'>


Unnamed: 0,id,doi
0,4205445286,https://doi.org/10.1109/auteee52864.2021.9668613
1,4205722486,https://doi.org/10.1201/9781003181613-17
2,4205932699,https://doi.org/10.37715/rmbe.v1i2.2414
3,4205601445,https://doi.org/10.1093/hepl/9780198840626.003...
4,4205604766,https://doi.org/10.1088/1755-1315/949/1/012104
5,4205398072,https://doi.org/10.3390/molecules27010277
6,4205115136,https://doi.org/10.1051/bioconf/20224301019
7,4205115748,https://doi.org/10.24917/20831765.16.2
8,4205790373,https://doi.org/10.7311/acta.57.2021.03
9,4205181778,https://doi.org/10.1109/icic54025.2021.9632982


更多使用方法，可以参考 [PySpark 手册](https://spark.apache.org/docs/3.1.2/api/python/getting_started/index.html)

# 重要：释放 session

请在使用资源时保持良好习惯，结束使用时执行下列语句对占用资源进行释放；或 shutdown notebook 也可对资源进行释放。

In [11]:
spark.sparkContext.stop()
spark.stop()