Skip to content

Commit

Permalink
[dataflowengineoss] handle python imports in globalFromLiteral
Browse files Browse the repository at this point in the history
  • Loading branch information
xavierpinho committed May 20, 2024
1 parent d7af886 commit d6e9e81
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import io.shiftleft.semanticcpg.language.*

package object dataflowengineoss {

def globalFromLiteral(lit: Literal): Iterator[Expression] = lit.start.inCall.assignment
.where(_.method.isModule)
.argument(1)
def globalFromLiteral(lit: Literal): Iterator[Expression] = {
val relevantLiteral = lit.start.where(_.method.isModule).l

// Gets <x> from `<x> = <lit>`
val lhsOfAssignment = relevantLiteral.inCall.assignment.argument(1)

// Gets <x> from `<x> = import(...<lit>...)` because of how Python imports are represented
val lhsOfImport = relevantLiteral.inCall.name("import").inCall.assignment.argument(1)

lhsOfAssignment ++ lhsOfImport
}

def identifierToFirstUsages(node: Identifier): List[Identifier] = node.refsTo.flatMap(identifiersFromCapturedScopes).l

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,80 @@ class DataFlowTests extends PySrc2CpgFixture(withOssDataflow = true) {
typeDeclFullName shouldBe "models.py:<module>.Foo"
}

"flow from global variable defined in imported file and used as argument to `print`" in {
val cpg = code("""
|from models import FOOBAR
|print(FOOBAR)
|""".stripMargin)
.moreCode(
"""
|FOOBAR = "XYZ"
|""".stripMargin,
fileName = "models.py"
)
def sink = cpg.call("print").argument.argumentIndex(1)
def source = cpg.literal("\"XYZ\"")
val List(flow) = sink.reachableByFlows(source).map(flowToResultPairs).l
flow shouldBe List(("FOOBAR = \"XYZ\"", 2), ("FOOBAR = import(models, FOOBAR)", 2), ("print(FOOBAR)", 3))
}

"flow from global variable defined in imported file and used inside a method as argument to `print`" in {
val cpg = code("""
|from models import FOOBAR
|def run():
| print(FOOBAR)
|""".stripMargin)
.moreCode(
"""
|FOOBAR = "XYZ"
|""".stripMargin,
fileName = "models.py"
)

def sink = cpg.call("print").argument.argumentIndex(1)
def source = cpg.literal("\"XYZ\"")
val List(flow) = sink.reachableByFlows(source).map(flowToResultPairs).l
flow shouldBe List(("FOOBAR = \"XYZ\"", 2), ("FOOBAR = import(models, FOOBAR)", 2), ("print(FOOBAR)", 4))
}

"flow from global variable defined in imported file and used as argument to another module's imported method" in {
val cpg = code("""
|import service
|from models import FOOBAR
|def run():
| service.doThing(FOOBAR)
|""".stripMargin)
.moreCode(
"""
|FOOBAR = "XYZ"
|""".stripMargin,
fileName = "models.py"
)

def sink = cpg.call("doThing").argument.argumentIndex(1)
def source = cpg.literal("\"XYZ\"")
val List(flow) = sink.reachableByFlows(source).map(flowToResultPairs).distinct.sortBy(_.length).l
flow shouldBe List(("FOOBAR = \"XYZ\"", 2), ("FOOBAR = import(models, FOOBAR)", 3), ("service.doThing(FOOBAR)", 5))
}

"flow from global variable defined in imported file and used as field access to `print`" in {
val cpg = code("""
|import models
|print(models.FOOBAR)
|""".stripMargin)
.moreCode(
"""
|FOOBAR = "XYZ"
|""".stripMargin,
fileName = "models.py"
)

def sink = cpg.call("print").argument.argumentIndex(1)
def source = cpg.literal("\"XYZ\"")
val List(flow) = sink.reachableByFlows(source).map(flowToResultPairs).distinct.sortBy(_.length).l
flow shouldBe List(("FOOBAR = \"XYZ\"", 2), ("models = import(, models)", 2), ("print(models.FOOBAR)", 3))
}

}

class RegexDefinedFlowsDataFlowTests
Expand Down

0 comments on commit d6e9e81

Please sign in to comment.